Coverage for mindsdb / integrations / handlers / ckan_handler / ckan_handler.py: 0%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from ckanapi import RemoteCKAN 

3 

4from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

5from mindsdb.integrations.libs.response import ( 

6 HandlerResponse, 

7 HandlerStatusResponse, 

8 RESPONSE_TYPE, 

9) 

10from mindsdb_sql_parser import ast 

11from mindsdb.utilities import log 

12from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

13 

14logger = log.getLogger(__name__) 

15 

16 

17class DatasetsTable(APITable): 

18 ''' 

19 Datasets table contains information about CKAN datasets. 

20 This table is used to list all datasets available in CKAN that have datastore active resources. 

21 ''' 

22 

23 def select(self, query: ast.Select) -> pd.DataFrame: 

24 conditions = extract_comparison_conditions(query.where) if query.where else [] 

25 limit = query.limit.value if query.limit else 1000 

26 packages = self.list(conditions, limit) 

27 return pd.DataFrame(packages) 

28 

29 def list(self, conditions=None, limit=1000): 

30 self.handler.connect() 

31 package_list = self.handler.call_ckan_api("package_search", {"rows": limit}) 

32 packages = package_list.get("results", []) 

33 

34 data = [] 

35 # Get only datastore active resources 

36 for pkg in packages: 

37 datastore_active_resources = [ 

38 r for r in pkg.get("resources", []) if r.get("datastore_active") 

39 ] 

40 data.append( 

41 { 

42 "id": pkg.get("id"), 

43 "name": pkg.get("name"), 

44 "title": pkg.get("title"), 

45 "num_resources": len(pkg.get("resources", [])), 

46 "num_datastore_active_resources": len(datastore_active_resources), 

47 } 

48 ) 

49 

50 return pd.DataFrame(data) 

51 

52 def get_columns(self): 

53 return [ 

54 "id", 

55 "name", 

56 "title", 

57 "num_resources", 

58 "num_datastore_active_resources", 

59 ] 

60 

61 

62class ResourceIDsTable(APITable): 

63 ''' 

64 ResourceIDs table contains information about CKAN resources. 

65 This table is used to list all resources available in CKAN that are datastore active. 

66 ''' 

67 

68 def select(self, query: ast.Select) -> pd.DataFrame: 

69 conditions = extract_comparison_conditions(query.where) if query.where else [] 

70 limit = query.limit.value if query.limit else 1000 

71 

72 resources = self.list(conditions, limit) 

73 return pd.DataFrame(resources) 

74 

75 def list(self, conditions=None, limit=1000): 

76 self.handler.connect() 

77 package_list = self.handler.call_ckan_api("package_search", {"rows": limit}) 

78 packages = package_list.get("results", []) 

79 

80 data = [] 

81 for package in packages: 

82 for resource in package.get("resources", []): 

83 # Get only datastore active resources 

84 if resource.get("datastore_active"): 

85 data.append( 

86 { 

87 "id": resource.get("id"), 

88 "package_id": package.get("id"), 

89 "name": resource.get("name"), 

90 "format": resource.get("format"), 

91 "url": resource.get("url"), 

92 "datastore_active": resource.get("datastore_active"), 

93 } 

94 ) 

95 if len(data) >= limit: 

96 break 

97 if len(data) >= limit: 

98 break 

99 

100 return pd.DataFrame(data) 

101 

102 def get_columns(self): 

103 return [ 

104 "id", 

105 "package_id", 

106 "name", 

107 "format", 

108 "url", 

109 "datastore_active", 

110 ] 

111 

112 

113class DatastoreTable(APITable): 

114 ''' 

115 Datastore table is used to query CKAN datastore resources. 

116 This table is used to query data from CKAN datastore resources. 

117 It is using the datastore_search_sql API to execute SQL queries on CKAN datastore resources. 

118 ''' 

119 

120 def select(self, query: ast.Select) -> pd.DataFrame: 

121 conditions = extract_comparison_conditions(query.where) if query.where else [] 

122 resource_id = self.extract_resource_id(conditions) 

123 

124 if resource_id: 

125 return self.execute_resource_query(query, resource_id) 

126 else: 

127 message = "Please provide a resource_id in your query. Example: SELECT * FROM datastore WHERE resource_id = 'your_resource_id'" 

128 df = pd.DataFrame({"message": [message]}) 

129 return df 

130 

131 def execute_resource_query( 

132 self, query: ast.Select, resource_id: str 

133 ) -> pd.DataFrame: 

134 sql_query = self.ast_to_sql(query, resource_id) 

135 result = self.handler.call_ckan_api("datastore_search_sql", {"sql": sql_query}) 

136 

137 records = result.get("records", []) 

138 

139 df = pd.DataFrame(records) 

140 

141 df = df.loc[:, ~df.columns.str.startswith("_")] 

142 

143 return df 

144 

145 def ast_to_sql(self, query: ast.Select, resource_id: str) -> str: 

146 sql_parts = [ 

147 f"SELECT {self.render_columns(query.targets)}", 

148 f'FROM "{resource_id}"', 

149 ] 

150 

151 # Handle WHERE clause 

152 where_conditions = [] 

153 if query.where: 

154 where_conditions = self.extract_where_conditions(query.where) 

155 

156 where_conditions = [ 

157 cond 

158 for cond in where_conditions 

159 if not ( 

160 isinstance(cond, ast.BinaryOperation) 

161 and cond.args[0].parts[-1] == "resource_id" 

162 ) 

163 ] 

164 

165 if where_conditions: 

166 sql_parts.append( 

167 f'WHERE {" AND ".join(self.render_where(cond) for cond in where_conditions)}' 

168 ) 

169 

170 # Handle LIMIT 

171 if query.limit: 

172 sql_parts.append(f"LIMIT {query.limit.value}") 

173 

174 return " ".join(sql_parts) 

175 

176 def render_columns(self, targets): 

177 if not targets or (len(targets) == 1 and isinstance(targets[0], ast.Star)): 

178 return "*" 

179 return ", ".join(self.render_column(target) for target in targets) 

180 

181 def render_column(self, target): 

182 if isinstance(target, ast.Identifier): 

183 return f'"{target.parts[-1]}"' 

184 # Handle other types of targets as needed 

185 return str(target) 

186 

187 def extract_where_conditions(self, where): 

188 if isinstance(where, ast.BinaryOperation) and where.op == "and": 

189 return self.extract_where_conditions( 

190 where.args[0] 

191 ) + self.extract_where_conditions(where.args[1]) 

192 return [where] 

193 

194 def render_where(self, where): 

195 if isinstance(where, ast.BinaryOperation): 

196 left = self.render_where(where.args[0]) 

197 right = self.render_where(where.args[1]) 

198 

199 if where.op == "like": 

200 return f"{left} ILIKE {right}" 

201 elif where.op in ["=", ">", "<", ">=", "<=", "<>"]: 

202 return f"{left} {where.op} {right}" 

203 # Add more operators as needed 

204 

205 elif isinstance(where, ast.Constant): 

206 return ( 

207 f"'{where.value}'" if isinstance(where.value, str) else str(where.value) 

208 ) 

209 

210 elif isinstance(where, ast.Identifier): 

211 return f'"{where.parts[-1]}"' 

212 

213 # Handle other types of WHERE conditions as needed 

214 return str(where) 

215 

216 def extract_resource_id(self, conditions): 

217 for condition in conditions: 

218 if isinstance(condition, list) and len(condition) == 3: 

219 op, col, val = condition 

220 if col == "resource_id" and op == "=": 

221 return val 

222 return None 

223 

224 def get_columns(self): 

225 return [field["id"] for field in self.fields] 

226 

227 

228class CkanHandler(APIHandler): 

229 name = "ckan" 

230 

231 def __init__(self, name=None, **kwargs): 

232 super().__init__(name) 

233 self.connection = None 

234 self.is_connected = False 

235 self.connection_args = kwargs.get("connection_data", {}) 

236 

237 self.datasets_table = DatasetsTable(self) 

238 self.resources_table = ResourceIDsTable(self) 

239 self.datastore_table = DatastoreTable(self) 

240 

241 self._register_table("datasets", self.datasets_table) 

242 self._register_table("resources", self.resources_table) 

243 self._register_table("datastore", self.datastore_table) 

244 

245 def connect(self): 

246 if self.is_connected: 

247 return self.connection 

248 

249 url = self.connection_args.get("url") 

250 api_key = self.connection_args.get("api_key") 

251 if not url: 

252 raise ValueError("CKAN URL is required") 

253 

254 try: 

255 self.connection = RemoteCKAN(url, apikey=api_key) 

256 self.is_connected = True 

257 logger.info(f"Successfully connected to CKAN at {url}") 

258 except Exception as e: 

259 logger.error(f"Error connecting to CKAN: {e}") 

260 raise ConnectionError(f"Failed to connect to CKAN: {e}") 

261 

262 return self.connection 

263 

264 def check_connection(self) -> HandlerStatusResponse: 

265 try: 

266 self.connect() 

267 return HandlerStatusResponse(success=True) 

268 except Exception as e: 

269 logger.error(f"Error checking connection: {e}") 

270 return HandlerStatusResponse(success=False, error_message=str(e)) 

271 

272 def call_ckan_api(self, method_name: str, params: dict): 

273 connection = self.connect() 

274 method = getattr(connection.action, method_name) 

275 

276 try: 

277 result = method(**params) 

278 return result 

279 except Exception as e: 

280 logger.error(f"Error calling CKAN API: {e}") 

281 raise RuntimeError(f"Failed to call CKAN API: {e}") 

282 

283 def native_query(self, query: str) -> HandlerResponse: 

284 method, params = self.parse_native_query(query) 

285 try: 

286 result = self.call_ckan_api(method, params) 

287 if isinstance(result, list): 

288 df = pd.DataFrame(result) 

289 elif isinstance(result, dict): 

290 df = pd.DataFrame([result]) 

291 else: 

292 df = pd.DataFrame([{"result": result}]) 

293 return HandlerResponse(RESPONSE_TYPE.TABLE, df) 

294 except Exception as e: 

295 logger.error(f"Error executing native query: {e}") 

296 return HandlerResponse(RESPONSE_TYPE.ERROR, error_message=str(e)) 

297 

298 @staticmethod 

299 def parse_native_query(query: str): 

300 parts = query.split(":") 

301 if len(parts) != 2: 

302 raise ValueError( 

303 "Invalid query format. Expected 'method_name:param1=value1,param2=value2'" 

304 ) 

305 method = parts[0].strip() 

306 params = {} 

307 if parts[1].strip(): 

308 param_pairs = parts[1].split(",") 

309 for pair in param_pairs: 

310 key, value = pair.split("=") 

311 params[key.strip()] = value.strip() 

312 

313 return method, params