Coverage for mindsdb / integrations / handlers / openbb_handler / openbb_tables.py: 0%
195 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.api.executor.utilities.sql import query_df
2from mindsdb.integrations.libs.api_handler import APITable
3from mindsdb_sql_parser import ast
4from mindsdb.integrations.utilities.date_utils import parse_local_date
5from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe
6from mindsdb.integrations.utilities.sql_utils import sort_dataframe
7from mindsdb.utilities import log
9from typing import Dict, List, Union
10from pydantic import ValidationError
12import pandas as pd
14logger = log.getLogger(__name__)
17class OpenBBtable(APITable):
18 def _get_params_from_conditions(self, conditions: List) -> Dict:
19 """Gets aggregate trade data API params from SQL WHERE conditions.
21 Returns params to use for Binance API call to klines.
23 Args:
24 conditions (List): List of individual SQL WHERE conditions.
25 """
26 params: dict = {}
27 # generic interpreter for conditions
28 # since these are all equality conditions due to OpenBB Platform's API
29 # then we can just use the first arg as the key and the second as the value
30 for op, arg1, arg2 in conditions:
31 if op != "=":
32 raise NotImplementedError
33 params[arg1] = arg2
35 return params
37 def _process_cols_names(self, cols: list) -> list:
38 new_cols = []
39 for element in cols:
40 # If the element is a tuple, we want to merge the elements together
41 if isinstance(element, tuple):
42 # If there's more than one element we want to merge them together
43 if len(element) > 1:
44 # Prevents the case where there's a multi column index and the index is a date
45 # in that instance we will have ('date', '') and this avoids having a column named 'date_'
46 new_element = "_".join(map(str, element)).rstrip("_")
47 new_cols.append(new_element)
48 else:
49 new_cols.append(element[0])
50 else:
51 new_cols.append(element)
52 return new_cols
54 def select(self, query: ast.Select) -> pd.DataFrame:
55 """Selects data from the OpenBB Platform and returns it as a pandas DataFrame.
57 Returns dataframe representing the OpenBB data.
59 Args:
60 query (ast.Select): Given SQL SELECT query
61 """
62 conditions = extract_comparison_conditions(query.where)
63 params = self._get_params_from_conditions(conditions)
65 try:
66 if params is None:
67 logger.error("At least cmd needs to be added!")
68 raise Exception("At least cmd needs to be added!")
70 # Get the OpenBB command to get the data from
71 cmd = params.pop("cmd")
73 # Ensure that the cmd provided is a valid OpenBB command
74 available_cmds = [f"obb{cmd}" for cmd in list(self.handler.obb.coverage.commands.keys())]
75 if cmd not in available_cmds:
76 logger.error(f"The command provided is not supported by OpenBB! Choose one of the following: {', '.join(available_cmds)}")
77 raise Exception(f"The command provided is not supported by OpenBB! Choose one of the following: {', '.join(available_cmds)}")
79 args = ""
80 # If there are parameters create arguments as a string
81 if params:
82 for arg, val in params.items():
83 args += f"{arg}={val},"
85 # Remove the additional ',' added at the end
86 if args:
87 args = args[:-1]
89 # Recreate the OpenBB command with the arguments
90 openbb_cmd = f"self.handler.{cmd}({args})"
92 # Execute the OpenBB command and return the OBBject
93 openbb_object = eval(openbb_cmd)
95 # Transform the OBBject into a pandas DataFrame
96 data = openbb_object.to_df()
98 # Check if index is a datetime, if it is we want that as a column
99 if isinstance(data.index, pd.DatetimeIndex):
100 data.reset_index(inplace=True)
102 # Process column names
103 data.columns = self._process_cols_names(data.columns)
105 except Exception as e:
106 logger.error(f"Error accessing data from OpenBB: {e}!")
107 raise Exception(f"Error accessing data from OpenBB: {e}!")
109 return data
112def create_table_class(
113 params_metadata,
114 response_metadata,
115 obb_function,
116 func_docs="",
117 provider=None
118):
119 """Creates a table class for the given OpenBB Platform function."""
120 mandatory_fields = [key for key in params_metadata['fields'].keys() if params_metadata['fields'][key].is_required() is True]
121 response_columns = list(response_metadata['fields'].keys())
123 class AnyTable(APITable):
124 def _get_params_from_conditions(self, conditions: List) -> Dict:
125 """Gets aggregate trade data API params from SQL WHERE conditions.
127 Returns params to use for Binance API call to klines.
129 Args:
130 conditions (List): List of individual SQL WHERE conditions.
131 """
132 params: dict = {}
133 # generic interpreter for conditions
134 # since these are all equality conditions due to OpenBB Platform's API
135 # then we can just use the first arg as the key and the second as the value
136 for op, arg1, arg2 in conditions:
137 if op == "=":
138 params[arg1] = arg2
140 return params
142 def select(self, query: ast.Select) -> pd.DataFrame:
143 """Selects data from the OpenBB Platform and returns it as a pandas DataFrame.
145 Returns dataframe representing the OpenBB data.
147 Args:
148 query (ast.Select): Given SQL SELECT query
149 """
150 conditions = extract_comparison_conditions(query.where)
151 arg_params = self._get_params_from_conditions(conditions=conditions)
153 params = {}
154 if provider is not None:
155 params['provider'] = provider
157 filters = []
158 mandatory_args_set = {key: False for key in mandatory_fields}
159 columns_to_add = {}
160 strict_filter = arg_params.get('strict_filter', False)
162 for op, arg1, arg2 in conditions:
163 if op == 'or':
164 raise NotImplementedError('OR is not supported')
166 if arg1 in mandatory_fields:
167 mandatory_args_set[arg1] = True
169 if ('start_' + arg1 in params_metadata['fields'] and arg1 in response_columns and arg2 is not None):
171 if response_metadata['fields'][arg1].annotation == 'datetime':
172 date = parse_local_date(arg2)
173 interval = arg_params.get('interval', '1d')
175 if op == '>':
176 params['start_' + arg1] = date.strftime('%Y-%m-%d')
177 elif op == '<':
178 params['end_' + arg1] = date.strftime('%Y-%m-%d')
179 elif op == '>=':
180 date = date - pd.Timedelta(interval)
181 params['start_' + arg1] = date.strftime('%Y-%m-%d')
182 elif op == '<=':
183 date = date + pd.Timedelta(interval)
184 params['end_' + arg1] = date.strftime('%Y-%m-%d')
185 elif op == '=':
186 date = date - pd.Timedelta(interval)
187 params['start_' + arg1] = date.strftime('%Y-%m-%d')
188 date = date + pd.Timedelta(interval)
189 params['end_' + arg1] = date.strftime('%Y-%m-%d')
191 elif arg1 in params_metadata['fields'] or not strict_filter:
192 if op == '=':
193 params[arg1] = arg2
194 columns_to_add[arg1] = arg2
196 filters.append([op, arg1, arg2])
198 if not all(mandatory_args_set.values()):
199 missing_args = ", ".join([k for k, v in mandatory_args_set.items() if v is False])
200 text = f"You must specify the following arguments in the WHERE statement: {missing_args}\n"
202 # Create docstring for the current function
203 text += "\nDocstring:"
204 for param in params_metadata['fields']:
205 field = params_metadata['fields'][param]
206 if getattr(field.annotation, '__origin__', None) is Union:
207 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]"
208 else:
209 annotation = field.annotation.__name__
210 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}"
212 text += f"\n\nFor more information check {func_docs}"
214 raise NotImplementedError(text)
216 try:
217 # Handle limit keyword correctly since it can't be parsed as a WHERE arg (i.e. WHERE limit = 50)
218 if query.limit is not None and 'limit' in params_metadata['fields']:
219 params['limit'] = query.limit.value
220 obbject = obb_function(**params)
222 # Extract data in dataframe format
223 result = obbject.to_df()
225 if result is None:
226 raise Exception(f"For more information check {func_docs}.")
228 # Check if index is a datetime, if it is we want that as a column
229 if isinstance(result.index, pd.DatetimeIndex):
230 result.reset_index(inplace=True)
232 if query.order_by:
233 result = sort_dataframe(result, query.order_by)
235 if query.limit is not None:
236 result = result.head(query.limit.value)
238 if result is None:
239 raise Exception(f"For more information check {func_docs}.")
241 for key in columns_to_add:
242 result[key] = params[key]
244 # filter targets
245 result = filter_dataframe(result, filters)
247 if result is None:
248 raise Exception(f"For more information check {func_docs}.")
250 columns = self.get_columns()
252 columns += [col for col in result.columns if col not in columns]
254 for full_target in query.targets:
255 if isinstance(full_target, ast.Star):
256 continue
257 if isinstance(full_target, ast.Identifier):
258 target = full_target.parts[-1].lower()
259 elif isinstance(full_target, ast.Function):
260 target = full_target.args[0].parts[-1].lower()
261 else:
262 # Could be a window function or other operation we can't handle. Defer to DuckDB.
263 return query_df(result, query)
264 if target not in columns:
265 raise ValueError(f"Unknown column '{target}' in 'field list'")
267 # project targets
268 try:
269 result = project_dataframe(result, query.targets, columns)
270 except NotImplementedError:
271 # Target contains a function that we need DuckDB to resolve.
272 return query_df(result, query)
273 return result
275 except AttributeError as e:
276 logger.info(f'Encountered error while executing OpenBB select: {str(e)}')
278 # Create docstring for the current function
279 text = "Docstring:"
280 for param in params_metadata['fields']:
281 field = params_metadata['fields'][param]
282 if getattr(field.annotation, '__origin__', None) is Union:
283 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]"
284 else:
285 annotation = field.annotation.__name__
286 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}"
288 text += f"\n\nFor more information check {func_docs}"
290 raise Exception(f"{str(e)}\n\n{text}.") from e
292 except ValidationError as e:
293 logger.info(f'Encountered error while executing OpenBB select: {str(e)}')
295 # Create docstring for the current function
296 text = "Docstring:"
297 for param in params_metadata['fields']:
298 field = params_metadata['fields'][param]
299 if getattr(field.annotation, '__origin__', None) is Union:
300 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]"
301 else:
302 annotation = field.annotation.__name__
303 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}"
305 text += f"\n\nFor more information check {func_docs}"
307 raise Exception(f"{str(e)}\n\n{text}.") from e
309 except Exception as e:
310 logger.info(f'Encountered error while executing OpenBB select: {str(e)}')
312 # TODO: This one doesn't work because it's taken care of from MindsDB side
313 if "Table not found" in str(e):
314 raise Exception(f"{str(e)}\n\nCheck if the method exists here: {func_docs}.\n\n - If it doesn't you may need to look for the parent module to check whether there's a typo in the naming.\n- If it does you may need to install a new extension to the OpenBB Platform, and you can see what is available at https://my.openbb.co/app/platform/extensions.") from e
316 if "Missing credential" in str(e):
317 raise Exception(f"{str(e)}\n\nGo to https://my.openbb.co/app/platform/api-keys to set this API key, for free.") from e
319 # Catch all other errors
320 # Create docstring for the current function
321 text = "Docstring:"
322 for param in params_metadata['fields']:
323 field = params_metadata['fields'][param]
324 if getattr(field.annotation, '__origin__', None) is Union:
325 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]"
326 else:
327 annotation = field.annotation.__name__
328 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}"
330 text += f"\n\nFor more information check {func_docs}"
332 raise Exception(f"{str(e)}\n\n{text}.") from e
334 def get_columns(self):
335 return response_columns
337 return AnyTable