Coverage for mindsdb / integrations / handlers / ckan_handler / ckan_handler.py: 0%
178 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from ckanapi import RemoteCKAN
4from mindsdb.integrations.libs.api_handler import APIHandler, APITable
5from mindsdb.integrations.libs.response import (
6 HandlerResponse,
7 HandlerStatusResponse,
8 RESPONSE_TYPE,
9)
10from mindsdb_sql_parser import ast
11from mindsdb.utilities import log
12from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
14logger = log.getLogger(__name__)
17class DatasetsTable(APITable):
18 '''
19 Datasets table contains information about CKAN datasets.
20 This table is used to list all datasets available in CKAN that have datastore active resources.
21 '''
23 def select(self, query: ast.Select) -> pd.DataFrame:
24 conditions = extract_comparison_conditions(query.where) if query.where else []
25 limit = query.limit.value if query.limit else 1000
26 packages = self.list(conditions, limit)
27 return pd.DataFrame(packages)
29 def list(self, conditions=None, limit=1000):
30 self.handler.connect()
31 package_list = self.handler.call_ckan_api("package_search", {"rows": limit})
32 packages = package_list.get("results", [])
34 data = []
35 # Get only datastore active resources
36 for pkg in packages:
37 datastore_active_resources = [
38 r for r in pkg.get("resources", []) if r.get("datastore_active")
39 ]
40 data.append(
41 {
42 "id": pkg.get("id"),
43 "name": pkg.get("name"),
44 "title": pkg.get("title"),
45 "num_resources": len(pkg.get("resources", [])),
46 "num_datastore_active_resources": len(datastore_active_resources),
47 }
48 )
50 return pd.DataFrame(data)
52 def get_columns(self):
53 return [
54 "id",
55 "name",
56 "title",
57 "num_resources",
58 "num_datastore_active_resources",
59 ]
62class ResourceIDsTable(APITable):
63 '''
64 ResourceIDs table contains information about CKAN resources.
65 This table is used to list all resources available in CKAN that are datastore active.
66 '''
68 def select(self, query: ast.Select) -> pd.DataFrame:
69 conditions = extract_comparison_conditions(query.where) if query.where else []
70 limit = query.limit.value if query.limit else 1000
72 resources = self.list(conditions, limit)
73 return pd.DataFrame(resources)
75 def list(self, conditions=None, limit=1000):
76 self.handler.connect()
77 package_list = self.handler.call_ckan_api("package_search", {"rows": limit})
78 packages = package_list.get("results", [])
80 data = []
81 for package in packages:
82 for resource in package.get("resources", []):
83 # Get only datastore active resources
84 if resource.get("datastore_active"):
85 data.append(
86 {
87 "id": resource.get("id"),
88 "package_id": package.get("id"),
89 "name": resource.get("name"),
90 "format": resource.get("format"),
91 "url": resource.get("url"),
92 "datastore_active": resource.get("datastore_active"),
93 }
94 )
95 if len(data) >= limit:
96 break
97 if len(data) >= limit:
98 break
100 return pd.DataFrame(data)
102 def get_columns(self):
103 return [
104 "id",
105 "package_id",
106 "name",
107 "format",
108 "url",
109 "datastore_active",
110 ]
113class DatastoreTable(APITable):
114 '''
115 Datastore table is used to query CKAN datastore resources.
116 This table is used to query data from CKAN datastore resources.
117 It is using the datastore_search_sql API to execute SQL queries on CKAN datastore resources.
118 '''
120 def select(self, query: ast.Select) -> pd.DataFrame:
121 conditions = extract_comparison_conditions(query.where) if query.where else []
122 resource_id = self.extract_resource_id(conditions)
124 if resource_id:
125 return self.execute_resource_query(query, resource_id)
126 else:
127 message = "Please provide a resource_id in your query. Example: SELECT * FROM datastore WHERE resource_id = 'your_resource_id'"
128 df = pd.DataFrame({"message": [message]})
129 return df
131 def execute_resource_query(
132 self, query: ast.Select, resource_id: str
133 ) -> pd.DataFrame:
134 sql_query = self.ast_to_sql(query, resource_id)
135 result = self.handler.call_ckan_api("datastore_search_sql", {"sql": sql_query})
137 records = result.get("records", [])
139 df = pd.DataFrame(records)
141 df = df.loc[:, ~df.columns.str.startswith("_")]
143 return df
145 def ast_to_sql(self, query: ast.Select, resource_id: str) -> str:
146 sql_parts = [
147 f"SELECT {self.render_columns(query.targets)}",
148 f'FROM "{resource_id}"',
149 ]
151 # Handle WHERE clause
152 where_conditions = []
153 if query.where:
154 where_conditions = self.extract_where_conditions(query.where)
156 where_conditions = [
157 cond
158 for cond in where_conditions
159 if not (
160 isinstance(cond, ast.BinaryOperation)
161 and cond.args[0].parts[-1] == "resource_id"
162 )
163 ]
165 if where_conditions:
166 sql_parts.append(
167 f'WHERE {" AND ".join(self.render_where(cond) for cond in where_conditions)}'
168 )
170 # Handle LIMIT
171 if query.limit:
172 sql_parts.append(f"LIMIT {query.limit.value}")
174 return " ".join(sql_parts)
176 def render_columns(self, targets):
177 if not targets or (len(targets) == 1 and isinstance(targets[0], ast.Star)):
178 return "*"
179 return ", ".join(self.render_column(target) for target in targets)
181 def render_column(self, target):
182 if isinstance(target, ast.Identifier):
183 return f'"{target.parts[-1]}"'
184 # Handle other types of targets as needed
185 return str(target)
187 def extract_where_conditions(self, where):
188 if isinstance(where, ast.BinaryOperation) and where.op == "and":
189 return self.extract_where_conditions(
190 where.args[0]
191 ) + self.extract_where_conditions(where.args[1])
192 return [where]
194 def render_where(self, where):
195 if isinstance(where, ast.BinaryOperation):
196 left = self.render_where(where.args[0])
197 right = self.render_where(where.args[1])
199 if where.op == "like":
200 return f"{left} ILIKE {right}"
201 elif where.op in ["=", ">", "<", ">=", "<=", "<>"]:
202 return f"{left} {where.op} {right}"
203 # Add more operators as needed
205 elif isinstance(where, ast.Constant):
206 return (
207 f"'{where.value}'" if isinstance(where.value, str) else str(where.value)
208 )
210 elif isinstance(where, ast.Identifier):
211 return f'"{where.parts[-1]}"'
213 # Handle other types of WHERE conditions as needed
214 return str(where)
216 def extract_resource_id(self, conditions):
217 for condition in conditions:
218 if isinstance(condition, list) and len(condition) == 3:
219 op, col, val = condition
220 if col == "resource_id" and op == "=":
221 return val
222 return None
224 def get_columns(self):
225 return [field["id"] for field in self.fields]
228class CkanHandler(APIHandler):
229 name = "ckan"
231 def __init__(self, name=None, **kwargs):
232 super().__init__(name)
233 self.connection = None
234 self.is_connected = False
235 self.connection_args = kwargs.get("connection_data", {})
237 self.datasets_table = DatasetsTable(self)
238 self.resources_table = ResourceIDsTable(self)
239 self.datastore_table = DatastoreTable(self)
241 self._register_table("datasets", self.datasets_table)
242 self._register_table("resources", self.resources_table)
243 self._register_table("datastore", self.datastore_table)
245 def connect(self):
246 if self.is_connected:
247 return self.connection
249 url = self.connection_args.get("url")
250 api_key = self.connection_args.get("api_key")
251 if not url:
252 raise ValueError("CKAN URL is required")
254 try:
255 self.connection = RemoteCKAN(url, apikey=api_key)
256 self.is_connected = True
257 logger.info(f"Successfully connected to CKAN at {url}")
258 except Exception as e:
259 logger.error(f"Error connecting to CKAN: {e}")
260 raise ConnectionError(f"Failed to connect to CKAN: {e}")
262 return self.connection
264 def check_connection(self) -> HandlerStatusResponse:
265 try:
266 self.connect()
267 return HandlerStatusResponse(success=True)
268 except Exception as e:
269 logger.error(f"Error checking connection: {e}")
270 return HandlerStatusResponse(success=False, error_message=str(e))
272 def call_ckan_api(self, method_name: str, params: dict):
273 connection = self.connect()
274 method = getattr(connection.action, method_name)
276 try:
277 result = method(**params)
278 return result
279 except Exception as e:
280 logger.error(f"Error calling CKAN API: {e}")
281 raise RuntimeError(f"Failed to call CKAN API: {e}")
283 def native_query(self, query: str) -> HandlerResponse:
284 method, params = self.parse_native_query(query)
285 try:
286 result = self.call_ckan_api(method, params)
287 if isinstance(result, list):
288 df = pd.DataFrame(result)
289 elif isinstance(result, dict):
290 df = pd.DataFrame([result])
291 else:
292 df = pd.DataFrame([{"result": result}])
293 return HandlerResponse(RESPONSE_TYPE.TABLE, df)
294 except Exception as e:
295 logger.error(f"Error executing native query: {e}")
296 return HandlerResponse(RESPONSE_TYPE.ERROR, error_message=str(e))
298 @staticmethod
299 def parse_native_query(query: str):
300 parts = query.split(":")
301 if len(parts) != 2:
302 raise ValueError(
303 "Invalid query format. Expected 'method_name:param1=value1,param2=value2'"
304 )
305 method = parts[0].strip()
306 params = {}
307 if parts[1].strip():
308 param_pairs = parts[1].split(",")
309 for pair in param_pairs:
310 key, value = pair.split("=")
311 params[key.strip()] = value.strip()
313 return method, params