Coverage for mindsdb / integrations / handlers / notion_handler / notion_handler.py: 0%
151 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import pandas as pd
3from collections import defaultdict
5from notion_client import Client
7from mindsdb.utilities import log
8from mindsdb.integrations.libs.api_handler import APIHandler
9from mindsdb.integrations.libs.response import (
10 HandlerStatusResponse as StatusResponse,
11 HandlerResponse,
12 RESPONSE_TYPE,
13)
14from .notion_table import (
15 NotionBlocksTable,
16 NotionCommentsTable,
17 NotionDatabaseTable,
18 NotionPagesTable,
19)
21logger = log.getLogger(__name__)
24class NotionHandler(APIHandler):
25 name = "notion"
27 def __init__(self, name: str, **kwargs):
28 """constructor
29 Args:
30 name (str): the handler name
31 """
32 super().__init__(name)
34 self.connection_args = kwargs.get("connection_data", {})
35 self.key = "api_token"
37 # set the api token from the args in create query
38 if self.key in self.connection_args:
39 self.connection_args[self.key] = self.connection_args[self.key]
41 self.api = None
42 self.is_connected = False
44 notion_database_data = NotionDatabaseTable(self)
45 notion_pages_data = NotionPagesTable(self)
46 notion_comments_data = NotionCommentsTable(self)
47 notion_blocks_data = NotionBlocksTable(self)
49 self._register_table("database", notion_database_data)
50 self._register_table("pages", notion_pages_data)
51 self._register_table("blocks", notion_blocks_data)
52 self._register_table("comments", notion_comments_data)
54 def connect(self, args=None, **kwargs):
55 api_token = self.connection_args[self.key]
56 notion = Client(auth=api_token)
57 self.is_connected = True
58 return notion
60 def check_connection(self) -> StatusResponse:
61 response = StatusResponse(False)
63 try:
64 self.connect()
65 response.success = True
67 except Exception as e:
68 response.error_message = (
69 f"Error connecting to Notion api: {e}. Check api_token"
70 )
71 logger.error(response.error_message)
72 response.success = False
74 if response.success is False:
75 self.is_connected = False
77 return response
79 def native_query(self, query: str = None) -> HandlerResponse:
80 method_name, param = query.split("(")
81 params = dict()
82 # parse the query as a python function
83 for map in param.strip(")").split(","):
84 if map:
85 k, v = map.split("=")
86 params[k] = v
88 df = self.call_notion_api(method_name, params)
89 return HandlerResponse(RESPONSE_TYPE.TABLE, data_frame=df)
91 def _apply_filters(self, data, filters):
92 if not filters:
93 return data
95 data2 = []
96 for row in data:
97 add = False
98 for op, key, value in filters:
99 value2 = row.get(key)
100 if isinstance(value, int):
101 value = str(value)
103 if op in ("!=", "<>"):
104 if value == value2:
105 break
106 elif op in ("==", "="):
107 if value != value2:
108 break
109 elif op == "in":
110 if not isinstance(value, list):
111 value = [value]
112 if value2 not in value:
113 break
114 elif op == "not in":
115 if not isinstance(value, list):
116 value = [value]
117 if value2 in value:
118 break
119 else:
120 raise NotImplementedError(f"Unknown filter: {op}")
121 add = True
122 if add:
123 data2.append(row)
124 return data2
126 def call_notion_api(
127 self, method_name: str = None, params: dict = None, filters: list = None
128 ):
129 # method > table > columns
130 expansions_map = {
131 "database": {
132 "page": ["id", "url", "properties"],
133 },
134 "page": {
135 "properties": ["Name"],
136 },
137 }
139 self.api = self.connect()
140 # use the service as the resource to query(database, page, block, comment)
141 # and query as the type of method(retrieve, list, query)
142 parts = method_name.split(".")
143 if len(parts) == 2:
144 service, query = parts
145 if service in ["databases", "pages", "blocks", "comments"]:
146 method = getattr(self.api, service)
147 method = getattr(method, query)
148 else:
149 service, children, query = parts
150 if service in ["blocks"]:
151 method = getattr(self.api, service)
152 method = getattr(method, children)
153 method = getattr(method, query)
155 count_results = None
156 data = []
157 includes = defaultdict(list)
159 max_page_size = 100
160 left = None
162 limit_exec_time = time.time() + 60
164 if filters:
165 # if we have filters: do big page requests
166 params["max_results"] = max_page_size
168 chunk = []
169 while True:
170 if time.time() > limit_exec_time:
171 raise RuntimeError("Handler request timeout error")
173 if count_results is not None:
174 left = count_results - len(data)
175 if left == 0:
176 break
177 elif left < 0:
178 # got more results that we need
179 data = data[:left]
180 break
182 logger.debug(f">>>notion in: {method_name}({params})")
184 resp = method(**params)
186 if hasattr(resp, "includes"):
187 for table, records in resp.includes.items():
188 includes[table].extend([r.data for r in records])
189 if resp.get("results"):
190 # database and comment api has list of results
191 if isinstance(resp["results"], list):
192 chunk = [r for r in resp["results"]]
193 else:
194 if resp.get("object") in ["page", "block"]:
195 chunk = [resp]
197 if filters:
198 chunk = self._apply_filters(chunk, filters)
200 # limit output
201 if left is not None:
202 chunk = chunk[:left]
204 data.extend(chunk)
205 if (
206 count_results is not None
207 and hasattr(resp, "meta")
208 and "next_token" in resp.meta
209 ):
210 params["next_token"] = resp.meta["next_token"]
211 else:
212 break
214 df = pd.DataFrame(data)
216 # enrich
217 expansions = expansions_map.get(method_name)
218 if expansions is not None:
219 for table, records in includes.items():
220 df_ref = pd.DataFrame(records)
222 if table not in expansions:
223 continue
225 for col_id in expansions[table]:
226 col = col_id[:-3] # cut _id
227 if col_id not in df.columns:
228 continue
230 col_map = {
231 col_ref: f"{col}_{col_ref}" for col_ref in df_ref.columns
232 }
233 df_ref2 = df_ref.rename(columns=col_map)
234 df_ref2 = df_ref2.drop_duplicates(col_id)
236 df = df.merge(df_ref2, on=col_id, how="left")
238 return df