Coverage for mindsdb / integrations / handlers / notion_handler / notion_table.py: 0%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import pandas as pd 

3 

4from mindsdb_sql_parser import ast 

5 

6from mindsdb.integrations.libs.api_handler import APITable 

7from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

8from mindsdb.utilities import log 

9from mindsdb.integrations.libs.response import HandlerResponse as Response 

10 

11 

12logger = log.getLogger(__name__) 

13 

14 

15class NotionDatabaseTable(APITable): 

16 def select(self, query: ast.Select) -> Response: 

17 conditions = extract_comparison_conditions(query.where) 

18 

19 params = {} 

20 filters = [] 

21 for op, arg1, arg2 in conditions: 

22 if op == "or": 

23 raise NotImplementedError("OR is not supported") 

24 

25 if arg1 == "database_id": 

26 if op == "=": 

27 params[arg1] = arg2 

28 else: 

29 NotImplementedError(f"Unknown op: {op}") 

30 

31 else: 

32 filters.append([op, arg1, arg2]) 

33 

34 # fetch a particular database with the given id 

35 # additionally filter the results 

36 result = self.handler.call_notion_api( 

37 method_name="databases.query", params=params, filters=filters 

38 ) 

39 

40 # filter targets 

41 columns = [] 

42 for target in query.targets: 

43 if isinstance(target, ast.Star): 

44 columns = [] 

45 break 

46 elif isinstance(target, ast.Identifier): 

47 columns.append(target.parts[-1]) 

48 else: 

49 raise NotImplementedError 

50 

51 if len(columns) == 0: 

52 columns = self.get_columns() 

53 

54 # columns to lower case 

55 columns = [name.lower() for name in columns] 

56 

57 if len(result) == 0: 

58 result = pd.DataFrame([], columns=columns) 

59 else: 

60 # add absent columns 

61 for col in set(columns) & set(result.columns) ^ set(columns): 

62 result[col] = None 

63 

64 # filter by columns 

65 result = result[columns] 

66 return result 

67 

68 def get_columns(self): 

69 return [ 

70 "id", 

71 "created_time", 

72 "last_edited_time", 

73 "created_by", 

74 "last_edited_by", 

75 "cover", 

76 "icon", 

77 "parent", 

78 "archived", 

79 "properties", 

80 "url", 

81 "public_url", 

82 ] 

83 

84 def insert(self, query: ast.Insert): 

85 columns = [col.name for col in query.columns] 

86 

87 insert_params = ("api_token",) 

88 for p in insert_params: 

89 if p not in self.handler.connection_args: 

90 raise Exception( 

91 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}" 

92 ) # noqa 

93 

94 for row in query.values: 

95 params = dict(zip(columns, row)) 

96 

97 # parent and properties as required params for creating a database 

98 params["parent"] = json.loads(params["parent"]) 

99 params["properties"] = json.loads(params["properties"]) 

100 params["title"] = json.loads(params.get("title", "{}")) 

101 

102 self.handler.call_notion_api("databases.create", params) 

103 

104 

105class NotionPagesTable(APITable): 

106 def select(self, query: ast.Select) -> Response: 

107 conditions = extract_comparison_conditions(query.where) 

108 

109 params = {} 

110 filters = [] 

111 for op, arg1, arg2 in conditions: 

112 if op == "or": 

113 raise NotImplementedError("OR is not supported") 

114 

115 if arg1 == "page_id": 

116 if op == "=": 

117 params[arg1] = arg2 

118 else: 

119 raise NotImplementedError 

120 

121 else: 

122 filters.append([op, arg1, arg2]) 

123 

124 if "query" not in params: 

125 # search not works without query, use 'mindsdb' 

126 params["query"] = "mindsdb" 

127 

128 # fetch a particular page with the given id 

129 result = self.handler.call_notion_api( 

130 method_name="pages.retrieve", params=params, filters=filters 

131 ) 

132 

133 # filter targets 

134 columns = [] 

135 for target in query.targets: 

136 if isinstance(target, ast.Star): 

137 columns = [] 

138 break 

139 elif isinstance(target, ast.Identifier): 

140 columns.append(target.parts[-1]) 

141 else: 

142 raise NotImplementedError 

143 

144 if len(columns) == 0: 

145 columns = self.get_columns() 

146 

147 # columns to lower case 

148 columns = [name.lower() for name in columns] 

149 

150 if len(result) == 0: 

151 result = pd.DataFrame([], columns=columns) 

152 else: 

153 # add absent columns 

154 for col in set(columns) & set(result.columns) ^ set(columns): 

155 result[col] = None 

156 

157 # filter by columns 

158 result = result[columns] 

159 return result 

160 

161 def get_columns(self): 

162 return [ 

163 "id", 

164 "object", 

165 "created_time", 

166 "last_edited_time", 

167 "created_by", 

168 "last_edited_by", 

169 "cover", 

170 "icon", 

171 "parent", 

172 "archived", 

173 "properties", 

174 "url", 

175 "public_url", 

176 ] 

177 

178 def insert(self, query: ast.Insert): 

179 columns = [col.name for col in query.columns] 

180 

181 insert_params = ("api_token",) 

182 for p in insert_params: 

183 if p not in self.handler.connection_args: 

184 raise Exception( 

185 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}" 

186 ) # noqa 

187 

188 for row in query.values: 

189 params = dict(zip(columns, row)) 

190 

191 # title and database_id as required params for creating the page 

192 # optionally provide the text to populate the page 

193 title = params["title"] 

194 text = params.get("text", "") 

195 

196 messages = [] 

197 

198 # the last message 

199 if text.strip() != "": 

200 messages.append(text.strip()) 

201 

202 len_messages = len(messages) 

203 for i, text in enumerate(messages): 

204 if i < len_messages - 1: 

205 text += "..." 

206 else: 

207 text += " " 

208 

209 params["parent"] = {"database_id": params["database_id"]} 

210 params["properties"] = { 

211 "Name": { 

212 "title": [ 

213 { 

214 "text": { 

215 "content": title, 

216 }, 

217 }, 

218 ], 

219 }, 

220 } 

221 params["children"] = [ 

222 { 

223 "object": "block", 

224 "type": "paragraph", 

225 "paragraph": { 

226 "rich_text": [ 

227 { 

228 "type": "text", 

229 "text": { 

230 "content": text, 

231 }, 

232 } 

233 ] 

234 }, 

235 } 

236 ] 

237 

238 self.handler.call_notion_api("pages.create", params) 

239 

240 

241class NotionBlocksTable(APITable): 

242 def select(self, query: ast.Select) -> Response: 

243 conditions = extract_comparison_conditions(query.where) 

244 

245 params = {} 

246 filters = [] 

247 for op, arg1, arg2 in conditions: 

248 if op == "or": 

249 raise NotImplementedError("OR is not supported") 

250 

251 if arg1 == "block_id": 

252 if op == "=": 

253 params[arg1] = arg2 

254 else: 

255 NotImplementedError(f"Unknown op: {op}") 

256 

257 else: 

258 filters.append([op, arg1, arg2]) 

259 

260 # fetch a particular block with the given id 

261 result = self.handler.call_notion_api( 

262 method_name="blocks.retrieve", params=params, filters=filters 

263 ) 

264 

265 # filter targets 

266 columns = [] 

267 for target in query.targets: 

268 if isinstance(target, ast.Star): 

269 columns = [] 

270 break 

271 elif isinstance(target, ast.Identifier): 

272 columns.append(target.parts[-1]) 

273 else: 

274 raise NotImplementedError 

275 

276 if len(columns) == 0: 

277 columns = self.get_columns() 

278 

279 # columns to lower case 

280 columns = [name.lower() for name in columns] 

281 

282 if len(result) == 0: 

283 result = pd.DataFrame([], columns=columns) 

284 else: 

285 # add absent columns 

286 for col in set(columns) & set(result.columns) ^ set(columns): 

287 result[col] = None 

288 

289 # filter by columns 

290 result = result[columns] 

291 return result 

292 

293 def get_columns(self): 

294 # most of the columns will remain NULL as a block can be of a single type 

295 return [ 

296 "object", 

297 "id", 

298 "parent", 

299 "has_children", 

300 "created_time", 

301 "last_edited_time", 

302 "created_by", 

303 "last_edited_by", 

304 "archived", 

305 "type", 

306 "bookmark", 

307 "breadcrumb", 

308 "bulleted_list_item", 

309 "callout", 

310 "child_database", 

311 "child_page", 

312 "column", 

313 "column_list", 

314 "divider", 

315 "embed", 

316 "equation", 

317 "file", 

318 "heading_1", 

319 "heading_2", 

320 "heading_3", 

321 "image", 

322 "link_preview", 

323 "link_to_page", 

324 "numbered_list_item", 

325 "paragraph", 

326 "pdf", 

327 "quote", 

328 "synced_block", 

329 "table", 

330 "table_of_contents", 

331 "table_row", 

332 "template", 

333 "to_do", 

334 "toggle", 

335 "unsupported", 

336 "video", 

337 ] 

338 

339 def insert(self, query: ast.Insert): 

340 columns = [col.name for col in query.columns] 

341 

342 insert_params = ("api_token",) 

343 for p in insert_params: 

344 if p not in self.handler.connection_args: 

345 raise Exception( 

346 f"To insert data into Notion, you need to provide the following parameters when connecting it to MindsDB: {insert_params}" 

347 ) # noqa 

348 

349 for row in query.values: 

350 params = dict(zip(columns, row)) 

351 

352 # block_id and children as required params for appending to a block 

353 params["block_id"] = params["block_id"] 

354 params["children"] = json.loads(params["children"]) 

355 params["after"] = params.get("after", "") 

356 

357 self.handler.call_notion_api("blocks.children.append", params) 

358 

359 

360class NotionCommentsTable(APITable): 

361 def select(self, query: ast.Select) -> Response: 

362 conditions = extract_comparison_conditions(query.where) 

363 

364 params = {} 

365 filters = [] 

366 for op, arg1, arg2 in conditions: 

367 if op == "or": 

368 raise NotImplementedError("OR is not supported") 

369 

370 if arg1 == "block_id": 

371 if op == "=": 

372 params[arg1] = arg2 

373 else: 

374 NotImplementedError(f"Unknown op: {op}") 

375 

376 else: 

377 filters.append([op, arg1, arg2]) 

378 

379 # list all the unresolved comments for a given block id 

380 result = self.handler.call_notion_api( 

381 method_name="comments.list", params=params, filters=filters 

382 ) 

383 

384 # filter targets 

385 columns = [] 

386 for target in query.targets: 

387 if isinstance(target, ast.Star): 

388 columns = [] 

389 break 

390 elif isinstance(target, ast.Identifier): 

391 columns.append(target.parts[-1]) 

392 else: 

393 raise NotImplementedError 

394 

395 if len(columns) == 0: 

396 columns = self.get_columns() 

397 

398 # columns to lower case 

399 columns = [name.lower() for name in columns] 

400 

401 if len(result) == 0: 

402 result = pd.DataFrame([], columns=columns) 

403 else: 

404 # add absent columns 

405 for col in set(columns) & set(result.columns) ^ set(columns): 

406 result[col] = None 

407 

408 # filter by columns 

409 result = result[columns] 

410 return result 

411 

412 def get_columns(self): 

413 return [ 

414 "id", 

415 "object", 

416 "parent", 

417 "discussion_id", 

418 "created_time", 

419 "last_edited_time", 

420 "created_by", 

421 "rich_text", 

422 ]