Coverage for mindsdb / integrations / handlers / openbb_handler / openbb_tables.py: 0%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.api.executor.utilities.sql import query_df 

2from mindsdb.integrations.libs.api_handler import APITable 

3from mindsdb_sql_parser import ast 

4from mindsdb.integrations.utilities.date_utils import parse_local_date 

5from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe 

6from mindsdb.integrations.utilities.sql_utils import sort_dataframe 

7from mindsdb.utilities import log 

8 

9from typing import Dict, List, Union 

10from pydantic import ValidationError 

11 

12import pandas as pd 

13 

14logger = log.getLogger(__name__) 

15 

16 

17class OpenBBtable(APITable): 

18 def _get_params_from_conditions(self, conditions: List) -> Dict: 

19 """Gets aggregate trade data API params from SQL WHERE conditions. 

20 

21 Returns params to use for Binance API call to klines. 

22 

23 Args: 

24 conditions (List): List of individual SQL WHERE conditions. 

25 """ 

26 params: dict = {} 

27 # generic interpreter for conditions 

28 # since these are all equality conditions due to OpenBB Platform's API 

29 # then we can just use the first arg as the key and the second as the value 

30 for op, arg1, arg2 in conditions: 

31 if op != "=": 

32 raise NotImplementedError 

33 params[arg1] = arg2 

34 

35 return params 

36 

37 def _process_cols_names(self, cols: list) -> list: 

38 new_cols = [] 

39 for element in cols: 

40 # If the element is a tuple, we want to merge the elements together 

41 if isinstance(element, tuple): 

42 # If there's more than one element we want to merge them together 

43 if len(element) > 1: 

44 # Prevents the case where there's a multi column index and the index is a date 

45 # in that instance we will have ('date', '') and this avoids having a column named 'date_' 

46 new_element = "_".join(map(str, element)).rstrip("_") 

47 new_cols.append(new_element) 

48 else: 

49 new_cols.append(element[0]) 

50 else: 

51 new_cols.append(element) 

52 return new_cols 

53 

54 def select(self, query: ast.Select) -> pd.DataFrame: 

55 """Selects data from the OpenBB Platform and returns it as a pandas DataFrame. 

56 

57 Returns dataframe representing the OpenBB data. 

58 

59 Args: 

60 query (ast.Select): Given SQL SELECT query 

61 """ 

62 conditions = extract_comparison_conditions(query.where) 

63 params = self._get_params_from_conditions(conditions) 

64 

65 try: 

66 if params is None: 

67 logger.error("At least cmd needs to be added!") 

68 raise Exception("At least cmd needs to be added!") 

69 

70 # Get the OpenBB command to get the data from 

71 cmd = params.pop("cmd") 

72 

73 # Ensure that the cmd provided is a valid OpenBB command 

74 available_cmds = [f"obb{cmd}" for cmd in list(self.handler.obb.coverage.commands.keys())] 

75 if cmd not in available_cmds: 

76 logger.error(f"The command provided is not supported by OpenBB! Choose one of the following: {', '.join(available_cmds)}") 

77 raise Exception(f"The command provided is not supported by OpenBB! Choose one of the following: {', '.join(available_cmds)}") 

78 

79 args = "" 

80 # If there are parameters create arguments as a string 

81 if params: 

82 for arg, val in params.items(): 

83 args += f"{arg}={val}," 

84 

85 # Remove the additional ',' added at the end 

86 if args: 

87 args = args[:-1] 

88 

89 # Recreate the OpenBB command with the arguments 

90 openbb_cmd = f"self.handler.{cmd}({args})" 

91 

92 # Execute the OpenBB command and return the OBBject 

93 openbb_object = eval(openbb_cmd) 

94 

95 # Transform the OBBject into a pandas DataFrame 

96 data = openbb_object.to_df() 

97 

98 # Check if index is a datetime, if it is we want that as a column 

99 if isinstance(data.index, pd.DatetimeIndex): 

100 data.reset_index(inplace=True) 

101 

102 # Process column names 

103 data.columns = self._process_cols_names(data.columns) 

104 

105 except Exception as e: 

106 logger.error(f"Error accessing data from OpenBB: {e}!") 

107 raise Exception(f"Error accessing data from OpenBB: {e}!") 

108 

109 return data 

110 

111 

112def create_table_class( 

113 params_metadata, 

114 response_metadata, 

115 obb_function, 

116 func_docs="", 

117 provider=None 

118): 

119 """Creates a table class for the given OpenBB Platform function.""" 

120 mandatory_fields = [key for key in params_metadata['fields'].keys() if params_metadata['fields'][key].is_required() is True] 

121 response_columns = list(response_metadata['fields'].keys()) 

122 

123 class AnyTable(APITable): 

124 def _get_params_from_conditions(self, conditions: List) -> Dict: 

125 """Gets aggregate trade data API params from SQL WHERE conditions. 

126 

127 Returns params to use for Binance API call to klines. 

128 

129 Args: 

130 conditions (List): List of individual SQL WHERE conditions. 

131 """ 

132 params: dict = {} 

133 # generic interpreter for conditions 

134 # since these are all equality conditions due to OpenBB Platform's API 

135 # then we can just use the first arg as the key and the second as the value 

136 for op, arg1, arg2 in conditions: 

137 if op == "=": 

138 params[arg1] = arg2 

139 

140 return params 

141 

142 def select(self, query: ast.Select) -> pd.DataFrame: 

143 """Selects data from the OpenBB Platform and returns it as a pandas DataFrame. 

144 

145 Returns dataframe representing the OpenBB data. 

146 

147 Args: 

148 query (ast.Select): Given SQL SELECT query 

149 """ 

150 conditions = extract_comparison_conditions(query.where) 

151 arg_params = self._get_params_from_conditions(conditions=conditions) 

152 

153 params = {} 

154 if provider is not None: 

155 params['provider'] = provider 

156 

157 filters = [] 

158 mandatory_args_set = {key: False for key in mandatory_fields} 

159 columns_to_add = {} 

160 strict_filter = arg_params.get('strict_filter', False) 

161 

162 for op, arg1, arg2 in conditions: 

163 if op == 'or': 

164 raise NotImplementedError('OR is not supported') 

165 

166 if arg1 in mandatory_fields: 

167 mandatory_args_set[arg1] = True 

168 

169 if ('start_' + arg1 in params_metadata['fields'] and arg1 in response_columns and arg2 is not None): 

170 

171 if response_metadata['fields'][arg1].annotation == 'datetime': 

172 date = parse_local_date(arg2) 

173 interval = arg_params.get('interval', '1d') 

174 

175 if op == '>': 

176 params['start_' + arg1] = date.strftime('%Y-%m-%d') 

177 elif op == '<': 

178 params['end_' + arg1] = date.strftime('%Y-%m-%d') 

179 elif op == '>=': 

180 date = date - pd.Timedelta(interval) 

181 params['start_' + arg1] = date.strftime('%Y-%m-%d') 

182 elif op == '<=': 

183 date = date + pd.Timedelta(interval) 

184 params['end_' + arg1] = date.strftime('%Y-%m-%d') 

185 elif op == '=': 

186 date = date - pd.Timedelta(interval) 

187 params['start_' + arg1] = date.strftime('%Y-%m-%d') 

188 date = date + pd.Timedelta(interval) 

189 params['end_' + arg1] = date.strftime('%Y-%m-%d') 

190 

191 elif arg1 in params_metadata['fields'] or not strict_filter: 

192 if op == '=': 

193 params[arg1] = arg2 

194 columns_to_add[arg1] = arg2 

195 

196 filters.append([op, arg1, arg2]) 

197 

198 if not all(mandatory_args_set.values()): 

199 missing_args = ", ".join([k for k, v in mandatory_args_set.items() if v is False]) 

200 text = f"You must specify the following arguments in the WHERE statement: {missing_args}\n" 

201 

202 # Create docstring for the current function 

203 text += "\nDocstring:" 

204 for param in params_metadata['fields']: 

205 field = params_metadata['fields'][param] 

206 if getattr(field.annotation, '__origin__', None) is Union: 

207 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]" 

208 else: 

209 annotation = field.annotation.__name__ 

210 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}" 

211 

212 text += f"\n\nFor more information check {func_docs}" 

213 

214 raise NotImplementedError(text) 

215 

216 try: 

217 # Handle limit keyword correctly since it can't be parsed as a WHERE arg (i.e. WHERE limit = 50) 

218 if query.limit is not None and 'limit' in params_metadata['fields']: 

219 params['limit'] = query.limit.value 

220 obbject = obb_function(**params) 

221 

222 # Extract data in dataframe format 

223 result = obbject.to_df() 

224 

225 if result is None: 

226 raise Exception(f"For more information check {func_docs}.") 

227 

228 # Check if index is a datetime, if it is we want that as a column 

229 if isinstance(result.index, pd.DatetimeIndex): 

230 result.reset_index(inplace=True) 

231 

232 if query.order_by: 

233 result = sort_dataframe(result, query.order_by) 

234 

235 if query.limit is not None: 

236 result = result.head(query.limit.value) 

237 

238 if result is None: 

239 raise Exception(f"For more information check {func_docs}.") 

240 

241 for key in columns_to_add: 

242 result[key] = params[key] 

243 

244 # filter targets 

245 result = filter_dataframe(result, filters) 

246 

247 if result is None: 

248 raise Exception(f"For more information check {func_docs}.") 

249 

250 columns = self.get_columns() 

251 

252 columns += [col for col in result.columns if col not in columns] 

253 

254 for full_target in query.targets: 

255 if isinstance(full_target, ast.Star): 

256 continue 

257 if isinstance(full_target, ast.Identifier): 

258 target = full_target.parts[-1].lower() 

259 elif isinstance(full_target, ast.Function): 

260 target = full_target.args[0].parts[-1].lower() 

261 else: 

262 # Could be a window function or other operation we can't handle. Defer to DuckDB. 

263 return query_df(result, query) 

264 if target not in columns: 

265 raise ValueError(f"Unknown column '{target}' in 'field list'") 

266 

267 # project targets 

268 try: 

269 result = project_dataframe(result, query.targets, columns) 

270 except NotImplementedError: 

271 # Target contains a function that we need DuckDB to resolve. 

272 return query_df(result, query) 

273 return result 

274 

275 except AttributeError as e: 

276 logger.info(f'Encountered error while executing OpenBB select: {str(e)}') 

277 

278 # Create docstring for the current function 

279 text = "Docstring:" 

280 for param in params_metadata['fields']: 

281 field = params_metadata['fields'][param] 

282 if getattr(field.annotation, '__origin__', None) is Union: 

283 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]" 

284 else: 

285 annotation = field.annotation.__name__ 

286 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}" 

287 

288 text += f"\n\nFor more information check {func_docs}" 

289 

290 raise Exception(f"{str(e)}\n\n{text}.") from e 

291 

292 except ValidationError as e: 

293 logger.info(f'Encountered error while executing OpenBB select: {str(e)}') 

294 

295 # Create docstring for the current function 

296 text = "Docstring:" 

297 for param in params_metadata['fields']: 

298 field = params_metadata['fields'][param] 

299 if getattr(field.annotation, '__origin__', None) is Union: 

300 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]" 

301 else: 

302 annotation = field.annotation.__name__ 

303 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}" 

304 

305 text += f"\n\nFor more information check {func_docs}" 

306 

307 raise Exception(f"{str(e)}\n\n{text}.") from e 

308 

309 except Exception as e: 

310 logger.info(f'Encountered error while executing OpenBB select: {str(e)}') 

311 

312 # TODO: This one doesn't work because it's taken care of from MindsDB side 

313 if "Table not found" in str(e): 

314 raise Exception(f"{str(e)}\n\nCheck if the method exists here: {func_docs}.\n\n - If it doesn't you may need to look for the parent module to check whether there's a typo in the naming.\n- If it does you may need to install a new extension to the OpenBB Platform, and you can see what is available at https://my.openbb.co/app/platform/extensions.") from e 

315 

316 if "Missing credential" in str(e): 

317 raise Exception(f"{str(e)}\n\nGo to https://my.openbb.co/app/platform/api-keys to set this API key, for free.") from e 

318 

319 # Catch all other errors 

320 # Create docstring for the current function 

321 text = "Docstring:" 

322 for param in params_metadata['fields']: 

323 field = params_metadata['fields'][param] 

324 if getattr(field.annotation, '__origin__', None) is Union: 

325 annotation = f"Union[{', '.join(arg.__name__ for arg in field.annotation.__args__)}]" 

326 else: 

327 annotation = field.annotation.__name__ 

328 text += f"\n * {param}{'' if field.is_required() else ' (optional)'}: {annotation}\n{field.description}" 

329 

330 text += f"\n\nFor more information check {func_docs}" 

331 

332 raise Exception(f"{str(e)}\n\n{text}.") from e 

333 

334 def get_columns(self): 

335 return response_columns 

336 

337 return AnyTable