Coverage for mindsdb / integrations / handlers / notion_handler / notion_handler.py: 0%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import pandas as pd 

3from collections import defaultdict 

4 

5from notion_client import Client 

6 

7from mindsdb.utilities import log 

8from mindsdb.integrations.libs.api_handler import APIHandler 

9from mindsdb.integrations.libs.response import ( 

10 HandlerStatusResponse as StatusResponse, 

11 HandlerResponse, 

12 RESPONSE_TYPE, 

13) 

14from .notion_table import ( 

15 NotionBlocksTable, 

16 NotionCommentsTable, 

17 NotionDatabaseTable, 

18 NotionPagesTable, 

19) 

20 

21logger = log.getLogger(__name__) 

22 

23 

24class NotionHandler(APIHandler): 

25 name = "notion" 

26 

27 def __init__(self, name: str, **kwargs): 

28 """constructor 

29 Args: 

30 name (str): the handler name 

31 """ 

32 super().__init__(name) 

33 

34 self.connection_args = kwargs.get("connection_data", {}) 

35 self.key = "api_token" 

36 

37 # set the api token from the args in create query 

38 if self.key in self.connection_args: 

39 self.connection_args[self.key] = self.connection_args[self.key] 

40 

41 self.api = None 

42 self.is_connected = False 

43 

44 notion_database_data = NotionDatabaseTable(self) 

45 notion_pages_data = NotionPagesTable(self) 

46 notion_comments_data = NotionCommentsTable(self) 

47 notion_blocks_data = NotionBlocksTable(self) 

48 

49 self._register_table("database", notion_database_data) 

50 self._register_table("pages", notion_pages_data) 

51 self._register_table("blocks", notion_blocks_data) 

52 self._register_table("comments", notion_comments_data) 

53 

54 def connect(self, args=None, **kwargs): 

55 api_token = self.connection_args[self.key] 

56 notion = Client(auth=api_token) 

57 self.is_connected = True 

58 return notion 

59 

60 def check_connection(self) -> StatusResponse: 

61 response = StatusResponse(False) 

62 

63 try: 

64 self.connect() 

65 response.success = True 

66 

67 except Exception as e: 

68 response.error_message = ( 

69 f"Error connecting to Notion api: {e}. Check api_token" 

70 ) 

71 logger.error(response.error_message) 

72 response.success = False 

73 

74 if response.success is False: 

75 self.is_connected = False 

76 

77 return response 

78 

79 def native_query(self, query: str = None) -> HandlerResponse: 

80 method_name, param = query.split("(") 

81 params = dict() 

82 # parse the query as a python function 

83 for map in param.strip(")").split(","): 

84 if map: 

85 k, v = map.split("=") 

86 params[k] = v 

87 

88 df = self.call_notion_api(method_name, params) 

89 return HandlerResponse(RESPONSE_TYPE.TABLE, data_frame=df) 

90 

91 def _apply_filters(self, data, filters): 

92 if not filters: 

93 return data 

94 

95 data2 = [] 

96 for row in data: 

97 add = False 

98 for op, key, value in filters: 

99 value2 = row.get(key) 

100 if isinstance(value, int): 

101 value = str(value) 

102 

103 if op in ("!=", "<>"): 

104 if value == value2: 

105 break 

106 elif op in ("==", "="): 

107 if value != value2: 

108 break 

109 elif op == "in": 

110 if not isinstance(value, list): 

111 value = [value] 

112 if value2 not in value: 

113 break 

114 elif op == "not in": 

115 if not isinstance(value, list): 

116 value = [value] 

117 if value2 in value: 

118 break 

119 else: 

120 raise NotImplementedError(f"Unknown filter: {op}") 

121 add = True 

122 if add: 

123 data2.append(row) 

124 return data2 

125 

126 def call_notion_api( 

127 self, method_name: str = None, params: dict = None, filters: list = None 

128 ): 

129 # method > table > columns 

130 expansions_map = { 

131 "database": { 

132 "page": ["id", "url", "properties"], 

133 }, 

134 "page": { 

135 "properties": ["Name"], 

136 }, 

137 } 

138 

139 self.api = self.connect() 

140 # use the service as the resource to query(database, page, block, comment) 

141 # and query as the type of method(retrieve, list, query) 

142 parts = method_name.split(".") 

143 if len(parts) == 2: 

144 service, query = parts 

145 if service in ["databases", "pages", "blocks", "comments"]: 

146 method = getattr(self.api, service) 

147 method = getattr(method, query) 

148 else: 

149 service, children, query = parts 

150 if service in ["blocks"]: 

151 method = getattr(self.api, service) 

152 method = getattr(method, children) 

153 method = getattr(method, query) 

154 

155 count_results = None 

156 data = [] 

157 includes = defaultdict(list) 

158 

159 max_page_size = 100 

160 left = None 

161 

162 limit_exec_time = time.time() + 60 

163 

164 if filters: 

165 # if we have filters: do big page requests 

166 params["max_results"] = max_page_size 

167 

168 chunk = [] 

169 while True: 

170 if time.time() > limit_exec_time: 

171 raise RuntimeError("Handler request timeout error") 

172 

173 if count_results is not None: 

174 left = count_results - len(data) 

175 if left == 0: 

176 break 

177 elif left < 0: 

178 # got more results that we need 

179 data = data[:left] 

180 break 

181 

182 logger.debug(f">>>notion in: {method_name}({params})") 

183 

184 resp = method(**params) 

185 

186 if hasattr(resp, "includes"): 

187 for table, records in resp.includes.items(): 

188 includes[table].extend([r.data for r in records]) 

189 if resp.get("results"): 

190 # database and comment api has list of results 

191 if isinstance(resp["results"], list): 

192 chunk = [r for r in resp["results"]] 

193 else: 

194 if resp.get("object") in ["page", "block"]: 

195 chunk = [resp] 

196 

197 if filters: 

198 chunk = self._apply_filters(chunk, filters) 

199 

200 # limit output 

201 if left is not None: 

202 chunk = chunk[:left] 

203 

204 data.extend(chunk) 

205 if ( 

206 count_results is not None 

207 and hasattr(resp, "meta") 

208 and "next_token" in resp.meta 

209 ): 

210 params["next_token"] = resp.meta["next_token"] 

211 else: 

212 break 

213 

214 df = pd.DataFrame(data) 

215 

216 # enrich 

217 expansions = expansions_map.get(method_name) 

218 if expansions is not None: 

219 for table, records in includes.items(): 

220 df_ref = pd.DataFrame(records) 

221 

222 if table not in expansions: 

223 continue 

224 

225 for col_id in expansions[table]: 

226 col = col_id[:-3] # cut _id 

227 if col_id not in df.columns: 

228 continue 

229 

230 col_map = { 

231 col_ref: f"{col}_{col_ref}" for col_ref in df_ref.columns 

232 } 

233 df_ref2 = df_ref.rename(columns=col_map) 

234 df_ref2 = df_ref2.drop_duplicates(col_id) 

235 

236 df = df.merge(df_ref2, on=col_id, how="left") 

237 

238 return df