Coverage for mindsdb / integrations / handlers / notion_handler / notion_table.py: 0%
188 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import pandas as pd
4from mindsdb_sql_parser import ast
6from mindsdb.integrations.libs.api_handler import APITable
7from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
8from mindsdb.utilities import log
9from mindsdb.integrations.libs.response import HandlerResponse as Response
12logger = log.getLogger(__name__)
15class NotionDatabaseTable(APITable):
16 def select(self, query: ast.Select) -> Response:
17 conditions = extract_comparison_conditions(query.where)
19 params = {}
20 filters = []
21 for op, arg1, arg2 in conditions:
22 if op == "or":
23 raise NotImplementedError("OR is not supported")
25 if arg1 == "database_id":
26 if op == "=":
27 params[arg1] = arg2
28 else:
29 NotImplementedError(f"Unknown op: {op}")
31 else:
32 filters.append([op, arg1, arg2])
34 # fetch a particular database with the given id
35 # additionally filter the results
36 result = self.handler.call_notion_api(
37 method_name="databases.query", params=params, filters=filters
38 )
40 # filter targets
41 columns = []
42 for target in query.targets:
43 if isinstance(target, ast.Star):
44 columns = []
45 break
46 elif isinstance(target, ast.Identifier):
47 columns.append(target.parts[-1])
48 else:
49 raise NotImplementedError
51 if len(columns) == 0:
52 columns = self.get_columns()
54 # columns to lower case
55 columns = [name.lower() for name in columns]
57 if len(result) == 0:
58 result = pd.DataFrame([], columns=columns)
59 else:
60 # add absent columns
61 for col in set(columns) & set(result.columns) ^ set(columns):
62 result[col] = None
64 # filter by columns
65 result = result[columns]
66 return result
68 def get_columns(self):
69 return [
70 "id",
71 "created_time",
72 "last_edited_time",
73 "created_by",
74 "last_edited_by",
75 "cover",
76 "icon",
77 "parent",
78 "archived",
79 "properties",
80 "url",
81 "public_url",
82 ]
84 def insert(self, query: ast.Insert):
85 columns = [col.name for col in query.columns]
87 insert_params = ("api_token",)
88 for p in insert_params:
89 if p not in self.handler.connection_args:
90 raise Exception(
91 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}"
92 ) # noqa
94 for row in query.values:
95 params = dict(zip(columns, row))
97 # parent and properties as required params for creating a database
98 params["parent"] = json.loads(params["parent"])
99 params["properties"] = json.loads(params["properties"])
100 params["title"] = json.loads(params.get("title", "{}"))
102 self.handler.call_notion_api("databases.create", params)
105class NotionPagesTable(APITable):
106 def select(self, query: ast.Select) -> Response:
107 conditions = extract_comparison_conditions(query.where)
109 params = {}
110 filters = []
111 for op, arg1, arg2 in conditions:
112 if op == "or":
113 raise NotImplementedError("OR is not supported")
115 if arg1 == "page_id":
116 if op == "=":
117 params[arg1] = arg2
118 else:
119 raise NotImplementedError
121 else:
122 filters.append([op, arg1, arg2])
124 if "query" not in params:
125 # search not works without query, use 'mindsdb'
126 params["query"] = "mindsdb"
128 # fetch a particular page with the given id
129 result = self.handler.call_notion_api(
130 method_name="pages.retrieve", params=params, filters=filters
131 )
133 # filter targets
134 columns = []
135 for target in query.targets:
136 if isinstance(target, ast.Star):
137 columns = []
138 break
139 elif isinstance(target, ast.Identifier):
140 columns.append(target.parts[-1])
141 else:
142 raise NotImplementedError
144 if len(columns) == 0:
145 columns = self.get_columns()
147 # columns to lower case
148 columns = [name.lower() for name in columns]
150 if len(result) == 0:
151 result = pd.DataFrame([], columns=columns)
152 else:
153 # add absent columns
154 for col in set(columns) & set(result.columns) ^ set(columns):
155 result[col] = None
157 # filter by columns
158 result = result[columns]
159 return result
161 def get_columns(self):
162 return [
163 "id",
164 "object",
165 "created_time",
166 "last_edited_time",
167 "created_by",
168 "last_edited_by",
169 "cover",
170 "icon",
171 "parent",
172 "archived",
173 "properties",
174 "url",
175 "public_url",
176 ]
178 def insert(self, query: ast.Insert):
179 columns = [col.name for col in query.columns]
181 insert_params = ("api_token",)
182 for p in insert_params:
183 if p not in self.handler.connection_args:
184 raise Exception(
185 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}"
186 ) # noqa
188 for row in query.values:
189 params = dict(zip(columns, row))
191 # title and database_id as required params for creating the page
192 # optionally provide the text to populate the page
193 title = params["title"]
194 text = params.get("text", "")
196 messages = []
198 # the last message
199 if text.strip() != "":
200 messages.append(text.strip())
202 len_messages = len(messages)
203 for i, text in enumerate(messages):
204 if i < len_messages - 1:
205 text += "..."
206 else:
207 text += " "
209 params["parent"] = {"database_id": params["database_id"]}
210 params["properties"] = {
211 "Name": {
212 "title": [
213 {
214 "text": {
215 "content": title,
216 },
217 },
218 ],
219 },
220 }
221 params["children"] = [
222 {
223 "object": "block",
224 "type": "paragraph",
225 "paragraph": {
226 "rich_text": [
227 {
228 "type": "text",
229 "text": {
230 "content": text,
231 },
232 }
233 ]
234 },
235 }
236 ]
238 self.handler.call_notion_api("pages.create", params)
241class NotionBlocksTable(APITable):
242 def select(self, query: ast.Select) -> Response:
243 conditions = extract_comparison_conditions(query.where)
245 params = {}
246 filters = []
247 for op, arg1, arg2 in conditions:
248 if op == "or":
249 raise NotImplementedError("OR is not supported")
251 if arg1 == "block_id":
252 if op == "=":
253 params[arg1] = arg2
254 else:
255 NotImplementedError(f"Unknown op: {op}")
257 else:
258 filters.append([op, arg1, arg2])
260 # fetch a particular block with the given id
261 result = self.handler.call_notion_api(
262 method_name="blocks.retrieve", params=params, filters=filters
263 )
265 # filter targets
266 columns = []
267 for target in query.targets:
268 if isinstance(target, ast.Star):
269 columns = []
270 break
271 elif isinstance(target, ast.Identifier):
272 columns.append(target.parts[-1])
273 else:
274 raise NotImplementedError
276 if len(columns) == 0:
277 columns = self.get_columns()
279 # columns to lower case
280 columns = [name.lower() for name in columns]
282 if len(result) == 0:
283 result = pd.DataFrame([], columns=columns)
284 else:
285 # add absent columns
286 for col in set(columns) & set(result.columns) ^ set(columns):
287 result[col] = None
289 # filter by columns
290 result = result[columns]
291 return result
293 def get_columns(self):
294 # most of the columns will remain NULL as a block can be of a single type
295 return [
296 "object",
297 "id",
298 "parent",
299 "has_children",
300 "created_time",
301 "last_edited_time",
302 "created_by",
303 "last_edited_by",
304 "archived",
305 "type",
306 "bookmark",
307 "breadcrumb",
308 "bulleted_list_item",
309 "callout",
310 "child_database",
311 "child_page",
312 "column",
313 "column_list",
314 "divider",
315 "embed",
316 "equation",
317 "file",
318 "heading_1",
319 "heading_2",
320 "heading_3",
321 "image",
322 "link_preview",
323 "link_to_page",
324 "numbered_list_item",
325 "paragraph",
326 "pdf",
327 "quote",
328 "synced_block",
329 "table",
330 "table_of_contents",
331 "table_row",
332 "template",
333 "to_do",
334 "toggle",
335 "unsupported",
336 "video",
337 ]
339 def insert(self, query: ast.Insert):
340 columns = [col.name for col in query.columns]
342 insert_params = ("api_token",)
343 for p in insert_params:
344 if p not in self.handler.connection_args:
345 raise Exception(
346 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}"
347 ) # noqa
349 for row in query.values:
350 params = dict(zip(columns, row))
352 # block_id and children as required params for appending to a block
353 params["block_id"] = params["block_id"]
354 params["children"] = json.loads(params["children"])
355 params["after"] = params.get("after", "")
357 self.handler.call_notion_api("blocks.children.append", params)
360class NotionCommentsTable(APITable):
361 def select(self, query: ast.Select) -> Response:
362 conditions = extract_comparison_conditions(query.where)
364 params = {}
365 filters = []
366 for op, arg1, arg2 in conditions:
367 if op == "or":
368 raise NotImplementedError("OR is not supported")
370 if arg1 == "block_id":
371 if op == "=":
372 params[arg1] = arg2
373 else:
374 NotImplementedError(f"Unknown op: {op}")
376 else:
377 filters.append([op, arg1, arg2])
379 # list all the unresolved comments for a given block id
380 result = self.handler.call_notion_api(
381 method_name="comments.list", params=params, filters=filters
382 )
384 # filter targets
385 columns = []
386 for target in query.targets:
387 if isinstance(target, ast.Star):
388 columns = []
389 break
390 elif isinstance(target, ast.Identifier):
391 columns.append(target.parts[-1])
392 else:
393 raise NotImplementedError
395 if len(columns) == 0:
396 columns = self.get_columns()
398 # columns to lower case
399 columns = [name.lower() for name in columns]
401 if len(result) == 0:
402 result = pd.DataFrame([], columns=columns)
403 else:
404 # add absent columns
405 for col in set(columns) & set(result.columns) ^ set(columns):
406 result[col] = None
408 # filter by columns
409 result = result[columns]
410 return result
412 def get_columns(self):
413 return [
414 "id",
415 "object",
416 "parent",
417 "discussion_id",
418 "created_time",
419 "last_edited_time",
420 "created_by",
421 "rich_text",
422 ]