Coverage for mindsdb / integrations / libs / api_handler.py: 35%
262 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Any, List, Optional
2import ast as py_ast
4import pandas as pd
5from mindsdb_sql_parser.ast import ASTNode, Select, Insert, Update, Delete, Star, BinaryOperation
6from mindsdb_sql_parser.ast.select.identifier import Identifier
8from mindsdb.integrations.utilities.sql_utils import (
9 extract_comparison_conditions,
10 filter_dataframe,
11 FilterCondition,
12 FilterOperator,
13 SortColumn,
14)
15from mindsdb.integrations.libs.base import BaseHandler
16from mindsdb.integrations.libs.api_handler_exceptions import TableAlreadyExists, TableNotFound
18from mindsdb.integrations.libs.response import HandlerResponse as Response, RESPONSE_TYPE
19from mindsdb.utilities import log
22logger = log.getLogger("mindsdb")
25class FuncParser:
26 def from_string(self, query_string):
27 body = py_ast.parse(query_string.strip(), mode="eval").body
29 if not isinstance(body, py_ast.Call): 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 raise RuntimeError(f"Api function not found {query_string}")
32 fnc_name = body.func.id
34 params = {}
35 for keyword in body.keywords:
36 name = keyword.arg
37 value = self.process(keyword.value)
39 params[name] = value
41 return fnc_name, params
43 def process(self, node):
44 if isinstance(node, py_ast.List): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 elements = []
46 for node2 in node.elts:
47 elements.append(self.process(node2))
48 return elements
50 if isinstance(node, py_ast.Dict): 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 keys = []
52 for node2 in node.keys:
53 if isinstance(node2, py_ast.Constant):
54 value = node2.value
55 elif isinstance(node2, py_ast.Str): # py37
56 value = node2.s
57 else:
58 raise NotImplementedError(f"Unknown dict key {node2}")
60 keys.append(value)
62 values = []
63 for node2 in node.values:
64 values.append(self.process(node2))
66 return dict(zip(keys, values))
68 if isinstance(node, py_ast.Name): 68 ↛ 70line 68 didn't jump to line 70 because the condition on line 68 was never true
69 # special attributes
70 name = node.id
71 if name == "true":
72 return True
73 elif name == "false":
74 return False
75 elif name == "null":
76 return None
78 if isinstance(node, py_ast.Constant): 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true
79 return node.value
81 # ---- python 3.7 objects -----
82 if isinstance(node, py_ast.Str):
83 return node.s
85 if isinstance(node, py_ast.Num):
86 return node.n
88 # -----------------------------
90 if isinstance(node, py_ast.UnaryOp):
91 if isinstance(node.op, py_ast.USub):
92 value = self.process(node.operand)
93 return -value
95 raise NotImplementedError(f"Unknown node {node}")
98class APITable:
99 def __init__(self, handler):
100 self.handler = handler
102 def select(self, query: Select) -> pd.DataFrame:
103 """Receive query as AST (abstract syntax tree) and act upon it.
105 Args:
106 query (ASTNode): sql query represented as AST. Usually it should be ast.Select
108 Returns:
109 pd.DataFrame
110 """
112 raise NotImplementedError()
114 def insert(self, query: Insert) -> None:
115 """Receive query as AST (abstract syntax tree) and act upon it somehow.
117 Args:
118 query (ASTNode): sql query represented as AST. Usually it should be ast.Insert
120 Returns:
121 None
122 """
123 raise NotImplementedError()
125 def update(self, query: ASTNode) -> None:
126 """Receive query as AST (abstract syntax tree) and act upon it somehow.
128 Args:
129 query (ASTNode): sql query represented as AST. Usually it should be ast.Update
130 Returns:
131 None
132 """
133 raise NotImplementedError()
135 def delete(self, query: ASTNode) -> None:
136 """Receive query as AST (abstract syntax tree) and act upon it somehow.
138 Args:
139 query (ASTNode): sql query represented as AST. Usually it should be ast.Delete
141 Returns:
142 None
143 """
144 raise NotImplementedError()
146 def get_columns(self) -> list:
147 """Maps the columns names from the API call resource
149 Returns:
150 List
151 """
152 raise NotImplementedError()
155class APIResource(APITable):
156 def __init__(self, *args, table_name=None, **kwargs):
157 self.table_name = table_name
158 super().__init__(*args, **kwargs)
160 def select(self, query: Select) -> pd.DataFrame:
161 """Receive query as AST (abstract syntax tree) and act upon it.
163 Args:
164 query (ASTNode): sql query represented as AST. Usually it should be ast.Select
166 Returns:
167 pd.DataFrame
168 """
170 api_conditions, raw_conditions = self._extract_conditions(query.where, strict=False)
172 limit = None
173 if query.limit: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 limit = query.limit.value
176 sort = None
177 if query.order_by and len(query.order_by) > 0: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 sort = []
179 for an_order in query.order_by:
180 if isinstance(an_order.field, Identifier):
181 sort.append(SortColumn(an_order.field.parts[-1], an_order.direction.upper() != "DESC"))
183 targets = []
184 for col in query.targets:
185 if isinstance(col, Identifier): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 targets.append(col.parts[-1])
188 kwargs = {"conditions": api_conditions, "limit": limit, "sort": sort, "targets": targets}
189 if self.table_name is not None: 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true
190 kwargs["table_name"] = self.table_name
192 result = self.list(**kwargs)
194 filters = []
195 for cond in api_conditions: 195 ↛ 196line 195 didn't jump to line 196 because the loop on line 195 never started
196 if not cond.applied:
197 filters.append([cond.op.value, cond.column, cond.value])
199 result = filter_dataframe(result, filters, raw_conditions=raw_conditions)
201 if limit is not None and len(result) > limit: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 result = result[: int(limit)]
204 return result
206 def list(
207 self,
208 conditions: List[FilterCondition] = None,
209 limit: int = None,
210 sort: List[SortColumn] = None,
211 targets: List[str] = None,
212 **kwargs,
213 ):
214 """
215 List items based on specified conditions, limits, sorting, and targets.
217 Args:
218 conditions (List[FilterCondition]): Optional. A list of conditions to filter the items. Each condition
219 should be an instance of the FilterCondition class.
220 limit (int): Optional. An integer to limit the number of items to be listed.
221 sort (List[SortColumn]): Optional. A list of sorting criteria
222 targets (List[str]): Optional. A list of strings representing specific fields
224 Raises:
225 NotImplementedError: This is an abstract method and should be implemented in a subclass.
226 """
227 raise NotImplementedError()
229 def insert(self, query: Insert) -> None:
230 """Receive query as AST (abstract syntax tree) and act upon it somehow.
232 Args:
233 query (ASTNode): sql query represented as AST. Usually it should be ast.Insert
235 Returns:
236 None
237 """
239 columns = [col.name for col in query.columns]
241 data = [dict(zip(columns, a_row)) for a_row in query.values]
242 kwargs = {}
243 if self.table_name is not None: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true
244 kwargs["table_name"] = self.table_name
246 self.add(data, **kwargs)
248 def add(self, row: List[dict], **kwargs) -> None:
249 """
250 Add a single item to the dataa collection
252 Args:
253 r ow (dict): A dictionary representing the item to be added.
255 Raises:
256 NotImplementedError: This is an abstract method and should be implemented in a subclass.
257 """
258 raise NotImplementedError()
260 def update(self, query: Update) -> None:
261 """Receive query as AST (abstract syntax tree) and act upon it somehow.
263 Args:
264 query (ASTNode): sql query represented as AST. Usually it should be ast.Update
266 Returns:
267 None
268 """
269 conditions, _ = self._extract_conditions(query.where)
271 values = {key: val.value for key, val in query.update_columns.items()}
273 self.modify(conditions, values)
275 def modify(self, conditions: List[FilterCondition], values: dict):
276 """
277 Modify items based on specified conditions and values.
279 Args:
280 conditions (List[FilterCondition]): A list of conditions to filter the items. Each condition
281 should be an instance of the FilterCondition class.
282 values (dict): A dictionary of values to be updated.
284 Raises:
285 NotImplementedError: This is an abstract method and should be implemented in a subclass.
286 """
287 raise NotImplementedError
289 def delete(self, query: Delete) -> None:
290 """Receive query as AST (abstract syntax tree) and act upon it somehow.
292 Args:
293 query (ASTNode): sql query represented as AST. Usually it should be ast.Delete
295 Returns:
296 None
297 """
298 conditions, _ = self._extract_conditions(query.where)
300 self.remove(conditions)
302 def remove(self, conditions: List[FilterCondition]):
303 """
304 Remove items based on specified conditions.
306 Args:
307 conditions (List[FilterCondition]): A list of conditions to filter the items. Each condition
308 should be an instance of the FilterCondition class.
310 Raises:
311 NotImplementedError: This is an abstract method and should be implemented in a subclass.
312 """
313 raise NotImplementedError()
315 def _extract_conditions(self, where: ASTNode, strict=True):
316 api_conditions, raw_conditions = [], []
317 for item in extract_comparison_conditions(where, strict=strict): 317 ↛ 318line 317 didn't jump to line 318 because the loop on line 317 never started
318 if isinstance(item, BinaryOperation):
319 # is it a raw condition
320 raw_conditions.append(item)
321 else:
322 api_conditions.append(FilterCondition(item[1], FilterOperator(item[0].upper()), item[2]))
324 return api_conditions, raw_conditions
327class MetaAPIResource(APIResource):
328 # TODO: Add a meta_table_info() method in case metadata cannot be retrieved as expected below?
330 def meta_get_tables(self, table_name: str, **kwargs) -> dict:
331 """
332 Retrieves table metadata for the API resource.
334 Args:
335 table_name (str): The name given to the table that represents the API resource. This is required because the name for the APIResource is given by the handler.
336 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
338 Returns:
339 Dict: The dictionary should contain the following fields:
340 - TABLE_NAME (str): Name of the table.
341 - TABLE_TYPE (str): Type of the table, e.g. 'BASE TABLE', 'VIEW', etc. (optional).
342 - TABLE_SCHEMA (str): Schema of the table (optional).
343 - TABLE_DESCRIPTION (str): Description of the table (optional).
344 - ROW_COUNT (int): Estimated number of rows in the table (optional).
345 """
346 pass
348 def meta_get_columns(self, table_name: str, **kwargs) -> List[dict]:
349 """
350 Retrieves column metadata for the API resource.
352 Args:
353 table_name (str): The name given to the table that represents the API resource. This is required because the name for the APIResource is given by the handler.
354 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
356 Returns:
357 List[dict]: The list should contain dictionaries with the following fields:
358 - TABLE_NAME (str): Name of the table.
359 - COLUMN_NAME (str): Name of the column.
360 - DATA_TYPE (str): Data type of the column, e.g. 'VARCHAR', 'INT', etc.
361 - COLUMN_DESCRIPTION (str): Description of the column (optional).
362 - IS_NULLABLE (bool): Whether the column can contain NULL values (optional).
363 - COLUMN_DEFAULT (str): Default value of the column (optional).
364 """
365 pass
367 def meta_get_column_statistics(self, table_name: str, **kwargs) -> List[dict]:
368 """
369 Retrieves column statistics for the API resource.
371 Args:
372 table_name (str): The name given to the table that represents the API resource. This is required because the name for the APIResource is given by the handler.
373 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
375 Returns:
376 List[dict]: The list should contain dictionaries with the following fields:
377 - TABLE_NAME (str): Name of the table.
378 - COLUMN_NAME (str): Name of the column.
379 - MOST_COMMON_VALUES (List[str]): Most common values in the column (optional).
380 - MOST_COMMON_FREQUENCIES (List[str]): Frequencies of the most common values in the column (optional).
381 - NULL_PERCENTAGE: Percentage of NULL values in the column (optional).
382 - MINIMUM_VALUE (str): Minimum value in the column (optional).
383 - MAXIMUM_VALUE (str): Maximum value in the column (optional).
384 - DISTINCT_VALUES_COUNT (int): Count of distinct values in the column (optional).
385 """
386 pass
388 def meta_get_primary_keys(self, table_name: str, **kwargs) -> List[dict]:
389 """
390 Retrieves primary key metadata for the API resource.
392 Args:
393 table_name (str): The name given to the table that represents the API resource. This is required because the name for the APIResource is given by the handler.
394 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
396 Returns:
397 List[dict]: The list should contain dictionaries with the following fields:
398 - TABLE_NAME (str): Name of the table.
399 - COLUMN_NAME (str): Name of the column that is part of the primary key.
400 - ORDINAL_POSITION (int): Position of the column in the primary key (optional).
401 - CONSTRAINT_NAME (str): Name of the primary key constraint (optional).
402 """
403 pass
405 def meta_get_foreign_keys(self, table_name: str, all_tables: List[str], **kwargs) -> List[dict]:
406 """
407 Retrieves foreign key metadata for the API resource.
409 Args:
410 table_name (str): The name given to the table that represents the API resource. This is required because the name for the APIResource is given by the handler.
411 all_tables (List[str]): A list of all table names in the API resource. This is used to identify relationships between tables.
412 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
414 Returns:
415 List[dict]: The list should contain dictionaries with the following fields:
416 - PARENT_TABLE_NAME (str): Name of the parent table.
417 - PARENT_COLUMN_NAME (str): Name of the parent column that is part of the foreign key.
418 - CHILD_TABLE_NAME (str): Name of the child table.
419 - CHILD_COLUMN_NAME (str): Name of the child column that is part of the foreign key.
420 - CONSTRAINT_NAME (str): Name of the foreign key constraint (optional).
421 """
422 pass
425class APIHandler(BaseHandler):
426 """
427 Base class for handlers associated to the applications APIs (e.g. twitter, slack, discord etc.)
428 """
430 def __init__(self, name: str):
431 super().__init__(name)
432 """ constructor
433 Args:
434 name (str): the handler name
435 """
436 self._tables = {}
438 def _register_table(self, table_name: str, table_class: Any):
439 """
440 Register the data resource. For e.g if you are using Twitter API it registers the `tweets` resource from `/api/v2/tweets`.
441 """
442 if table_name.lower() in self._tables:
443 raise TableAlreadyExists(f"Table with name {table_name} already exists for this handler")
444 self._tables[table_name.lower()] = table_class
446 def _get_table(self, name: Identifier):
447 """
448 Check if the table name was added to the _register_table
449 Args:
450 name (Identifier): the table name
451 """
452 name = name.parts[-1].lower()
453 if name in self._tables: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was always true
454 return self._tables[name]
455 raise TableNotFound(f"Table not found: {name}")
457 def query(self, query: ASTNode):
458 if isinstance(query, Select):
459 # If the list method exists, it should be overridden in the child class.
460 # The APIResource class could be used as a base class by overriding the select method, but not the list method.
461 table = self._get_table(query.from_table)
462 list_method = getattr(table, "list", None)
463 if not list_method or (list_method and list_method.__func__ is APIResource.list):
464 # for back compatibility, targets wasn't passed in previous version
465 query.targets = [Star()]
466 result = self._get_table(query.from_table).select(query)
467 elif isinstance(query, Update):
468 result = self._get_table(query.table).update(query)
469 elif isinstance(query, Insert):
470 result = self._get_table(query.table).insert(query)
471 elif isinstance(query, Delete):
472 result = self._get_table(query.table).delete(query)
473 else:
474 raise NotImplementedError
476 if result is None:
477 return Response(RESPONSE_TYPE.OK)
478 elif isinstance(result, pd.DataFrame):
479 return Response(RESPONSE_TYPE.TABLE, result)
480 else:
481 raise NotImplementedError
483 def get_columns(self, table_name: str) -> Response:
484 """
485 Returns a list of entity columns
486 Args:
487 table_name (str): the table name
488 Returns:
489 RESPONSE_TYPE.TABLE
490 """
492 result = self._get_table(Identifier(table_name)).get_columns()
494 df = pd.DataFrame(result, columns=["Field"])
495 df["Type"] = "str"
497 return Response(RESPONSE_TYPE.TABLE, df)
499 def get_tables(self) -> Response:
500 """
501 Return list of entities
502 Returns:
503 RESPONSE_TYPE.TABLE
504 """
505 result = list(self._tables.keys())
507 df = pd.DataFrame(result, columns=["table_name"])
508 df["table_type"] = "BASE TABLE"
510 return Response(RESPONSE_TYPE.TABLE, df)
513class MetaAPIHandler(APIHandler):
514 """
515 Base class for handlers associated to the applications APIs (e.g. twitter, slack, discord etc.)
517 This class is used when the handler is also needed to store information in the data catalog.
518 """
520 def meta_get_handler_info(self, **kwargs) -> str:
521 """
522 Retrieves information about the design and implementation of the API handler.
523 This should include, but not be limited to, the following:
524 - The type of SQL queries and operations that the handler supports.
525 - etc.
527 Args:
528 kwargs: Additional keyword arguments that may be used in generating the handler information.
530 Returns:
531 str: A string containing information about the API handler's design and implementation.
532 """
533 pass
535 def meta_get_tables(self, table_names: Optional[List[str]] = None, **kwargs) -> Response:
536 """
537 Retrieves metadata for the specified tables (or all tables if no list is provided).
539 Args:
540 table_names (List): A list of table names for which to retrieve metadata.
541 kwargs: Additional keyword arguments that may be used by the specific API resource implementation.
543 Returns:
544 Response: A response object containing the table metadata.
545 """
546 df = pd.DataFrame()
547 for table_name, table_class in self._tables.items():
548 if table_names is None or table_name in table_names:
549 try:
550 if hasattr(table_class, "meta_get_tables"):
551 table_metadata = table_class.meta_get_tables(table_name, **kwargs)
552 df = pd.concat([df, pd.DataFrame([table_metadata])], ignore_index=True)
553 except Exception:
554 logger.exception(f"Error retrieving metadata for table {table_name}:")
556 if len(df.columns) == 0:
557 df = pd.DataFrame(
558 columns=[
559 "TABLE_NAME",
560 "TABLE_TYPE",
561 "TABLE_SCHEMA",
562 "TABLE_DESCRIPTION",
563 "ROW_COUNT",
564 ]
565 )
567 return Response(RESPONSE_TYPE.TABLE, df)
569 def meta_get_columns(self, table_names: Optional[List[str]] = None, **kwargs) -> Response:
570 """
571 Retrieves column metadata for the specified tables (or all tables if no list is provided).
573 Args:
574 table_names (List): A list of table names for which to retrieve column metadata.
576 Returns:
577 Response: A response object containing the column metadata.
578 """
579 df = pd.DataFrame()
580 for table_name, table_class in self._tables.items():
581 if table_names is None or table_name in table_names:
582 try:
583 if hasattr(table_class, "meta_get_columns"):
584 column_metadata = table_class.meta_get_columns(table_name, **kwargs)
585 df = pd.concat([df, pd.DataFrame(column_metadata)], ignore_index=True)
586 except Exception:
587 logger.exception(f"Error retrieving column metadata for table {table_name}:")
589 if len(df.columns) == 0:
590 df = pd.DataFrame(
591 columns=[
592 "TABLE_NAME",
593 "COLUMN_NAME",
594 "DATA_TYPE",
595 "COLUMN_DESCRIPTION",
596 "IS_NULLABLE",
597 "COLUMN_DEFAULT",
598 ]
599 )
601 return Response(RESPONSE_TYPE.TABLE, df)
603 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None, **kwargs) -> Response:
604 """
605 Retrieves column statistics for the specified tables (or all tables if no list is provided).
607 Args:
608 table_names (List): A list of table names for which to retrieve column statistics.
610 Returns:
611 Response: A response object containing the column statistics.
612 """
613 df = pd.DataFrame()
614 for table_name, table_class in self._tables.items():
615 if table_names is None or table_name in table_names:
616 try:
617 if hasattr(table_class, "meta_get_column_statistics"):
618 column_statistics = table_class.meta_get_column_statistics(table_name, **kwargs)
619 df = pd.concat([df, pd.DataFrame(column_statistics)], ignore_index=True)
620 except Exception:
621 logger.exception(f"Error retrieving column statistics for table {table_name}:")
623 if len(df.columns) == 0:
624 df = pd.DataFrame(
625 columns=[
626 "TABLE_NAME",
627 "COLUMN_NAME",
628 "MOST_COMMON_VALUES",
629 "MOST_COMMON_FREQUENCIES",
630 "NULL_PERCENTAGE",
631 "MINIMUM_VALUE",
632 "MAXIMUM_VALUE",
633 "DISTINCT_VALUES_COUNT",
634 ]
635 )
637 return Response(RESPONSE_TYPE.TABLE, df)
639 def meta_get_primary_keys(self, table_names: Optional[List[str]] = None, **kwargs) -> Response:
640 """
641 Retrieves primary key metadata for the specified tables (or all tables if no list is provided).
643 Args:
644 table_names (List): A list of table names for which to retrieve primary key metadata.
646 Returns:
647 Response: A response object containing the primary key metadata.
648 """
649 df = pd.DataFrame()
650 for table_name, table_class in self._tables.items():
651 if table_names is None or table_name in table_names:
652 try:
653 if hasattr(table_class, "meta_get_primary_keys"):
654 primary_key_metadata = table_class.meta_get_primary_keys(table_name, **kwargs)
655 df = pd.concat([df, pd.DataFrame(primary_key_metadata)], ignore_index=True)
656 except Exception:
657 logger.exception(f"Error retrieving primary keys for table {table_name}:")
659 if len(df.columns) == 0:
660 df = pd.DataFrame(
661 columns=[
662 "TABLE_NAME",
663 "COLUMN_NAME",
664 "ORDINAL_POSITION",
665 "CONSTRAINT_NAME",
666 ]
667 )
669 return Response(RESPONSE_TYPE.TABLE, df)
671 def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None, **kwargs) -> Response:
672 """
673 Retrieves foreign key metadata for the specified tables (or all tables if no list is provided).
675 Args:
676 table_names (List): A list of table names for which to retrieve foreign key metadata.
678 Returns:
679 Response: A response object containing the foreign key metadata.
680 """
681 df = pd.DataFrame()
682 all_tables = list(self._tables.keys())
683 for table_name, table_class in self._tables.items():
684 if table_names is None or table_name in table_names:
685 try:
686 if hasattr(table_class, "meta_get_foreign_keys"):
687 foreign_key_metadata = table_class.meta_get_foreign_keys(
688 table_name, all_tables=table_names if table_names else all_tables, **kwargs
689 )
690 df = pd.concat([df, pd.DataFrame(foreign_key_metadata)], ignore_index=True)
691 except Exception:
692 logger.exception(f"Error retrieving foreign keys for table {table_name}:")
694 if len(df.columns) == 0:
695 df = pd.DataFrame(
696 columns=[
697 "PARENT_TABLE_NAME",
698 "PARENT_COLUMN_NAME",
699 "CHILD_TABLE_NAME",
700 "CHILD_COLUMN_NAME",
701 "CONSTRAINT_NAME",
702 ]
703 )
705 return Response(RESPONSE_TYPE.TABLE, df)
708class APIChatHandler(APIHandler):
709 def get_chat_config(self):
710 """Return configuration to connect to chatbot
712 Returns:
713 Dict
714 """
715 raise NotImplementedError()
717 def get_my_user_name(self) -> list:
718 """Return configuration to connect to chatbot
720 Returns:
721 Dict
722 """
723 raise NotImplementedError()