Coverage for mindsdb / api / executor / command_executor.py: 19%

933 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime 

2from pathlib import Path 

3from textwrap import dedent 

4from typing import Optional 

5from functools import reduce 

6 

7import pandas as pd 

8from mindsdb_sql_parser import parse_sql 

9from mindsdb_sql_parser.ast.mindsdb import AlterDatabase 

10from mindsdb_sql_parser.ast import ( 

11 Alter, 

12 ASTNode, 

13 BinaryOperation, 

14 CommitTransaction, 

15 Constant, 

16 CreateTable, 

17 Delete, 

18 Describe, 

19 DropDatabase, 

20 DropTables, 

21 DropView, 

22 Explain, 

23 Identifier, 

24 Insert, 

25 NativeQuery, 

26 Operation, 

27 RollbackTransaction, 

28 Select, 

29 Set, 

30 Show, 

31 Star, 

32 StartTransaction, 

33 Union, 

34 Update, 

35 Use, 

36 Tuple, 

37 Function, 

38 Variable, 

39 Intersect, 

40 Except, 

41 Parameter, 

42 NullConstant, 

43) 

44 

45# typed models 

46from mindsdb_sql_parser.ast.mindsdb import ( 

47 AlterView, 

48 CreateAgent, 

49 CreateAnomalyDetectionModel, 

50 CreateChatBot, 

51 CreateDatabase, 

52 CreateJob, 

53 CreateKnowledgeBase, 

54 AlterKnowledgeBase, 

55 CreateMLEngine, 

56 CreatePredictor, 

57 CreateSkill, 

58 CreateTrigger, 

59 CreateView, 

60 CreateKnowledgeBaseIndex, 

61 EvaluateKnowledgeBase, 

62 DropAgent, 

63 DropChatBot, 

64 DropDatasource, 

65 DropJob, 

66 DropKnowledgeBase, 

67 DropMLEngine, 

68 DropPredictor, 

69 DropSkill, 

70 DropTrigger, 

71 Evaluate, 

72 FinetunePredictor, 

73 RetrainPredictor, 

74 UpdateAgent, 

75 UpdateChatBot, 

76 UpdateSkill, 

77) 

78 

79import mindsdb.utilities.profiler as profiler 

80 

81from mindsdb.api.executor.sql_query.result_set import Column, ResultSet 

82from mindsdb.api.executor.sql_query import SQLQuery 

83from mindsdb.api.executor.data_types.answer import ExecuteAnswer 

84from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import ( 

85 CHARSET_NUMBERS, 

86 SERVER_VARIABLES, 

87 TYPES, 

88) 

89 

90from mindsdb.api.executor.exceptions import ( 

91 ExecutorException, 

92 BadDbError, 

93 NotSupportedYet, 

94 WrongArgumentError, 

95 TableNotExistError, 

96) 

97from mindsdb.api.executor.utilities.functions import download_file 

98from mindsdb.api.executor.utilities.sql import query_df 

99from mindsdb.integrations.libs.const import ( 

100 HANDLER_CONNECTION_ARG_TYPE, 

101 PREDICTOR_STATUS, 

102) 

103from mindsdb.integrations.utilities.query_traversal import query_traversal 

104from mindsdb.integrations.libs.response import HandlerStatusResponse 

105from mindsdb.interfaces.chatbot.chatbot_controller import ChatBotController 

106from mindsdb.interfaces.database.projects import ProjectController 

107from mindsdb.interfaces.jobs.jobs_controller import JobsController 

108from mindsdb.interfaces.model.functions import ( 

109 get_model_record, 

110 get_model_records, 

111 get_predictor_integration, 

112) 

113from mindsdb.interfaces.query_context.context_controller import query_context_controller 

114from mindsdb.interfaces.triggers.triggers_controller import TriggersController 

115from mindsdb.interfaces.variables.variables_controller import variables_controller 

116from mindsdb.utilities.context import context as ctx 

117from mindsdb.utilities.functions import mark_process, resolve_model_identifier, get_handler_install_message 

118from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError 

119from mindsdb.utilities import log 

120 

121logger = log.getLogger(__name__) 

122 

123 

124def _get_show_where( 

125 statement: ASTNode, 

126 from_name: Optional[str] = None, 

127 like_name: Optional[str] = None, 

128 initial: Optional[ASTNode] = None, 

129) -> ASTNode: 

130 """combine all possible show filters to single 'where' condition 

131 SHOW category [FROM name] [LIKE filter] [WHERE filter] 

132 

133 Args: 

134 statement (ASTNode): 'show' query statement 

135 from_name (str): name of column for 'from' filter 

136 like_name (str): name of column for 'like' filter, 

137 initial (ASTNode): initial 'where' filter 

138 Returns: 

139 ASTNode: 'where' statemnt 

140 """ 

141 where = [] 

142 if initial is not None: 

143 where.append(initial) 

144 if statement.from_table is not None and from_name is not None: 

145 where.append( 

146 BinaryOperation( 

147 "=", 

148 args=[Identifier(from_name), Constant(statement.from_table.parts[-1])], 

149 ) 

150 ) 

151 if statement.like is not None and like_name is not None: 

152 where.append(BinaryOperation("like", args=[Identifier(like_name), Constant(statement.like)])) 

153 if statement.where is not None: 

154 where.append(statement.where) 

155 

156 if len(where) > 0: 

157 return reduce(lambda prev, next: BinaryOperation("and", args=[prev, next]), where) 

158 return None 

159 

160 

161def match_one_part_name(identifier: Identifier, ensure_lower_case: bool = False) -> str: 

162 """Extract a single-part name from an Identifier object, optionally ensuring it is lowercase. 

163 

164 Args: 

165 identifier (Identifier): The identifier to extract the name from. Must contain exactly one part. 

166 ensure_lower_case (bool, optional): If True, raises ValueError if the name is not lowercase. Defaults to False. 

167 

168 Returns: 

169 str: The extracted name, converted to lowercase if not quoted. 

170 

171 Raises: 

172 ValueError: If the identifier does not contain exactly one part, or if ensure_lower_case is True and the name is not lowercase. 

173 """ 

174 match identifier.parts, identifier.is_quoted: 

175 case [name], [is_quoted]: 

176 ... 

177 case _: 

178 raise ValueError(f"Only single-part names are allowed: {identifier}") 

179 if not is_quoted: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true

180 name = name.lower() 

181 if ensure_lower_case and not name.islower(): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 raise ValueError(f"The name must be in lowercase: {identifier}") 

183 return name 

184 

185 

186def match_two_part_name( 

187 identifier: Identifier, ensure_lower_case: bool = False, default_db_name: str | None = None 

188) -> tuple[str, str]: 

189 """Extract a (database, name) tuple from an Identifier object that may have one or two parts. 

190 

191 Args: 

192 identifier (Identifier): The identifier to extract names from. Must contain one or two parts. 

193 ensure_lower_case (bool, optional): If True, raises ValueError if the name part is not lowercase. Defaults to False. 

194 default_db_name (str | None, optional): The default database name to use if only one part is provided. Defaults to None. 

195 

196 Returns: 

197 tuple[str, str]: A tuple of (database_name, name), where database_name may be None if not provided and no default is given. 

198 

199 Raises: 

200 ValueError: If the identifier does not contain one or two parts, or if ensure_lower_case is True and the name is not lowercase. 

201 """ 

202 db_name = None 

203 

204 match identifier.parts, identifier.is_quoted: 

205 case [name], [is_quoted]: 

206 ... 

207 case [db_name, name], [db_is_quoted, is_quoted]: 

208 if not db_is_quoted: 

209 db_name = db_name.lower() 

210 case _: 

211 raise ValueError(f"Only single-part or two-part names are allowed: {identifier}") 

212 if not is_quoted: 

213 name = name.lower() 

214 if ensure_lower_case and not name.islower(): 

215 raise ValueError(f"The name must be in lowercase: {identifier}") 

216 if db_name is None: 

217 db_name = default_db_name 

218 return db_name, name 

219 

220 

221def apply_parameters(statement, params): 

222 def fill_parameters(node, **kwargs): 

223 if isinstance(node, Parameter): 

224 if node.value in params: 224 ↛ exitline 224 didn't return from function 'fill_parameters' because the condition on line 224 was always true

225 value = params[node.value] 

226 if value is None: 

227 return NullConstant() 

228 if isinstance(value, list): 

229 return Tuple([Constant(i) for i in value]) 

230 return Constant(value) 

231 

232 query_traversal(statement, fill_parameters) 

233 

234 

235class ExecuteCommands: 

236 def __init__(self, session, context=None): 

237 if context is None: 

238 context = {} 

239 

240 self.context = context 

241 self.session = session 

242 

243 self.charset_text_type = CHARSET_NUMBERS["utf8_general_ci"] 

244 self.datahub = session.datahub 

245 

246 @profiler.profile() 

247 def execute_command(self, statement: ASTNode, database_name: str = None) -> ExecuteAnswer: 

248 sql: str = statement.to_string() 

249 sql_lower: str = sql.lower() 

250 

251 if database_name is None: 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was always true

252 database_name = self.session.database 

253 

254 if ctx.params: 

255 apply_parameters(statement, ctx.params) 

256 

257 statement_type = type(statement) 

258 if statement_type is CreateDatabase: 

259 return self.answer_create_database(statement) 

260 elif statement_type is CreateMLEngine: 

261 return self.answer_create_ml_engine(statement) 

262 elif statement_type is DropMLEngine: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 return self.answer_drop_ml_engine(statement) 

264 elif statement_type is DropPredictor: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 return self.answer_drop_model(statement, database_name) 

266 elif statement_type is DropTables: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 return self.answer_drop_tables(statement, database_name) 

268 elif statement_type is DropDatasource or statement_type is DropDatabase: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 return self.answer_drop_database(statement) 

270 elif statement_type is AlterDatabase: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 return self.answer_alter_database(statement) 

272 elif statement_type is Describe: 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was never true

273 # NOTE in sql 'describe table' is same as 'show columns' 

274 obj_type = statement.type 

275 

276 if obj_type is None or obj_type.upper() in ("MODEL", "PREDICTOR"): 

277 return self.answer_describe_predictor(statement.value, database_name) 

278 else: 

279 return self.answer_describe_object(obj_type.upper(), statement.value, database_name) 

280 

281 elif statement_type is RetrainPredictor: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 return self.answer_retrain_predictor(statement, database_name) 

283 elif statement_type is FinetunePredictor: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 return self.answer_finetune_predictor(statement, database_name) 

285 elif statement_type is Show: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 sql_category = statement.category.lower() 

287 if hasattr(statement, "modes"): 

288 if isinstance(statement.modes, list) is False: 

289 statement.modes = [] 

290 statement.modes = [x.upper() for x in statement.modes] 

291 if sql_category == "ml_engines": 

292 new_statement = Select( 

293 targets=[Star()], 

294 from_table=Identifier(parts=["information_schema", "ml_engines"]), 

295 where=_get_show_where(statement, like_name="name"), 

296 ) 

297 

298 query = SQLQuery(new_statement, session=self.session, database=database_name) 

299 return self.answer_select(query) 

300 elif sql_category == "handlers": 

301 new_statement = Select( 

302 targets=[Star()], 

303 from_table=Identifier(parts=["information_schema", "handlers"]), 

304 where=_get_show_where(statement, like_name="name"), 

305 ) 

306 

307 query = SQLQuery(new_statement, session=self.session, database=database_name) 

308 return self.answer_select(query) 

309 elif sql_category == "plugins": 

310 if statement.where is not None or statement.like: 

311 raise ExecutorException("'SHOW PLUGINS' query should be used without filters") 

312 new_statement = Select( 

313 targets=[Star()], 

314 from_table=Identifier(parts=["information_schema", "PLUGINS"]), 

315 ) 

316 query = SQLQuery(new_statement, session=self.session, database=database_name) 

317 return self.answer_select(query) 

318 elif sql_category in ("databases", "schemas"): 

319 new_statement = Select( 

320 targets=[Identifier(parts=["NAME"], alias=Identifier("Database"))], 

321 from_table=Identifier(parts=["information_schema", "DATABASES"]), 

322 where=_get_show_where(statement, like_name="Database"), 

323 ) 

324 

325 if "FULL" in statement.modes: 

326 new_statement.targets.extend( 

327 [ 

328 Identifier(parts=["TYPE"], alias=Identifier("TYPE")), 

329 Identifier(parts=["ENGINE"], alias=Identifier("ENGINE")), 

330 ] 

331 ) 

332 

333 query = SQLQuery(new_statement, session=self.session, database=database_name) 

334 return self.answer_select(query) 

335 elif sql_category in ("tables", "full tables"): 

336 schema = database_name or "mindsdb" 

337 if statement.from_table is not None and statement.in_table is not None: 

338 raise ExecutorException( 

339 "You have an error in your SQL syntax: 'from' and 'in' cannot be used together" 

340 ) 

341 

342 if statement.from_table is not None: 

343 schema = statement.from_table.parts[-1] 

344 statement.from_table = None 

345 if statement.in_table is not None: 

346 schema = statement.in_table.parts[-1] 

347 statement.in_table = None 

348 

349 table_types = [Constant(t) for t in ["MODEL", "BASE TABLE", "SYSTEM VIEW", "VIEW"]] 

350 where = BinaryOperation( 

351 "and", 

352 args=[ 

353 BinaryOperation("=", args=[Identifier("table_schema"), Constant(schema)]), 

354 BinaryOperation("in", args=[Identifier("table_type"), Tuple(table_types)]), 

355 ], 

356 ) 

357 

358 new_statement = Select( 

359 targets=[ 

360 Identifier( 

361 parts=["table_name"], 

362 alias=Identifier(f"Tables_in_{schema}"), 

363 ) 

364 ], 

365 from_table=Identifier(parts=["information_schema", "TABLES"]), 

366 where=_get_show_where(statement, like_name=f"Tables_in_{schema}", initial=where), 

367 ) 

368 

369 if "FULL" in statement.modes: 

370 new_statement.targets.append(Identifier(parts=["TABLE_TYPE"], alias=Identifier("Table_type"))) 

371 

372 query = SQLQuery(new_statement, session=self.session, database=database_name) 

373 return self.answer_select(query) 

374 elif sql_category in ( 

375 "variables", 

376 "session variables", 

377 "session status", 

378 "global variables", 

379 ): 

380 new_statement = Select( 

381 targets=[ 

382 Identifier(parts=["Variable_name"]), 

383 Identifier(parts=["Value"]), 

384 ], 

385 from_table=Identifier(parts=["dataframe"]), 

386 where=_get_show_where(statement, like_name="Variable_name"), 

387 ) 

388 

389 data = {} 

390 is_session = "session" in sql_category 

391 for var_name, var_data in SERVER_VARIABLES.items(): 

392 var_name = var_name.replace("@@", "") 

393 if is_session and var_name.startswith("session.") is False: 

394 continue 

395 if var_name.startswith("session.") or var_name.startswith("GLOBAL."): 

396 name = var_name.replace("session.", "").replace("GLOBAL.", "") 

397 data[name] = var_data[0] 

398 elif var_name not in data: 

399 data[var_name] = var_data[0] 

400 

401 df = pd.DataFrame(data.items(), columns=["Variable_name", "Value"]) 

402 df2 = query_df(df, new_statement) 

403 

404 return ExecuteAnswer(data=ResultSet.from_df(df2, table_name="session_variables")) 

405 elif sql_category == "search_path": 

406 return ExecuteAnswer( 

407 data=ResultSet( 

408 columns=[Column(name="search_path", table_name="search_path", type="str")], 

409 values=[['"$user", public']], 

410 ) 

411 ) 

412 elif "show status like 'ssl_version'" in sql_lower: 

413 return ExecuteAnswer( 

414 data=ResultSet( 

415 columns=[ 

416 Column(name="Value", table_name="session_variables", type="str"), 

417 Column(name="Value", table_name="session_variables", type="str"), 

418 ], 

419 values=[["Ssl_version", "TLSv1.1"]], 

420 ) 

421 ) 

422 elif sql_category in ("function status", "procedure status"): 

423 # SHOW FUNCTION STATUS WHERE Db = 'MINDSDB'; 

424 # SHOW PROCEDURE STATUS WHERE Db = 'MINDSDB' 

425 # SHOW FUNCTION STATUS WHERE Db = 'MINDSDB' AND Name LIKE '%'; 

426 return self.answer_function_status() 

427 elif sql_category in ("index", "keys", "indexes"): 

428 # INDEX | INDEXES | KEYS are synonyms 

429 # https://dev.mysql.com/doc/refman/8.0/en/show-index.html 

430 new_statement = Select( 

431 targets=[ 

432 Identifier("TABLE_NAME", alias=Identifier("Table")), 

433 Identifier("NON_UNIQUE", alias=Identifier("Non_unique")), 

434 Identifier("INDEX_NAME", alias=Identifier("Key_name")), 

435 Identifier("SEQ_IN_INDEX", alias=Identifier("Seq_in_index")), 

436 Identifier("COLUMN_NAME", alias=Identifier("Column_name")), 

437 Identifier("COLLATION", alias=Identifier("Collation")), 

438 Identifier("CARDINALITY", alias=Identifier("Cardinality")), 

439 Identifier("SUB_PART", alias=Identifier("Sub_part")), 

440 Identifier("PACKED", alias=Identifier("Packed")), 

441 Identifier("NULLABLE", alias=Identifier("Null")), 

442 Identifier("INDEX_TYPE", alias=Identifier("Index_type")), 

443 Identifier("COMMENT", alias=Identifier("Comment")), 

444 Identifier("INDEX_COMMENT", alias=Identifier("Index_comment")), 

445 Identifier("IS_VISIBLE", alias=Identifier("Visible")), 

446 Identifier("EXPRESSION", alias=Identifier("Expression")), 

447 ], 

448 from_table=Identifier(parts=["information_schema", "STATISTICS"]), 

449 where=statement.where, 

450 ) 

451 query = SQLQuery(new_statement, session=self.session, database=database_name) 

452 return self.answer_select(query) 

453 # FIXME if have answer on that request, then DataGrip show warning '[S0022] Column 'Non_unique' not found.' 

454 elif "show create table" in sql_lower: 

455 # SHOW CREATE TABLE `MINDSDB`.`predictors` 

456 table = sql[sql.rfind(".") + 1 :].strip(" .;\n\t").replace("`", "") 

457 return self.answer_show_create_table(table) 

458 elif sql_category in ("character set", "charset"): 

459 new_statement = Select( 

460 targets=[ 

461 Identifier("CHARACTER_SET_NAME", alias=Identifier("Charset")), 

462 Identifier("DEFAULT_COLLATE_NAME", alias=Identifier("Description")), 

463 Identifier("DESCRIPTION", alias=Identifier("Default collation")), 

464 Identifier("MAXLEN", alias=Identifier("Maxlen")), 

465 ], 

466 from_table=Identifier(parts=["INFORMATION_SCHEMA", "CHARACTER_SETS"]), 

467 where=_get_show_where(statement, like_name="CHARACTER_SET_NAME"), 

468 ) 

469 query = SQLQuery(new_statement, session=self.session, database=database_name) 

470 return self.answer_select(query) 

471 elif sql_category == "warnings": 

472 return self.answer_show_warnings() 

473 elif sql_category == "engines": 

474 new_statement = Select( 

475 targets=[Star()], 

476 from_table=Identifier(parts=["information_schema", "ENGINES"]), 

477 ) 

478 query = SQLQuery(new_statement, session=self.session, database=database_name) 

479 return self.answer_select(query) 

480 elif sql_category == "collation": 

481 new_statement = Select( 

482 targets=[ 

483 Identifier("COLLATION_NAME", alias=Identifier("Collation")), 

484 Identifier("CHARACTER_SET_NAME", alias=Identifier("Charset")), 

485 Identifier("ID", alias=Identifier("Id")), 

486 Identifier("IS_DEFAULT", alias=Identifier("Default")), 

487 Identifier("IS_COMPILED", alias=Identifier("Compiled")), 

488 Identifier("SORTLEN", alias=Identifier("Sortlen")), 

489 Identifier("PAD_ATTRIBUTE", alias=Identifier("Pad_attribute")), 

490 ], 

491 from_table=Identifier(parts=["INFORMATION_SCHEMA", "COLLATIONS"]), 

492 where=_get_show_where(statement, like_name="Collation"), 

493 ) 

494 query = SQLQuery(new_statement, session=self.session, database=database_name) 

495 return self.answer_select(query) 

496 elif sql_category == "table status": 

497 # TODO improve it 

498 # SHOW TABLE STATUS LIKE 'table' 

499 table_name = None 

500 if statement.like is not None: 

501 table_name = statement.like 

502 # elif condition == 'from' and type(expression) == Identifier: 

503 # table_name = expression.parts[-1] 

504 if table_name is None: 

505 err_str = f"Can't determine table name in query: {sql}" 

506 logger.warning(err_str) 

507 raise TableNotExistError(err_str) 

508 return self.answer_show_table_status(table_name) 

509 elif sql_category == "columns": 

510 is_full = statement.modes is not None and "FULL" in statement.modes 

511 return self.answer_show_columns( 

512 statement.from_table, 

513 statement.where, 

514 statement.like, 

515 is_full=is_full, 

516 database_name=database_name, 

517 ) 

518 

519 elif sql_category in ( 

520 "agents", 

521 "jobs", 

522 "skills", 

523 "chatbots", 

524 "triggers", 

525 "views", 

526 "knowledge_bases", 

527 "knowledge bases", 

528 "predictors", 

529 "models", 

530 ): 

531 if sql_category == "knowledge bases": 

532 sql_category = "knowledge_bases" 

533 

534 if sql_category == "predictors": 

535 sql_category = "models" 

536 

537 db_name = database_name 

538 if statement.from_table is not None: 

539 db_name = statement.from_table.parts[-1] 

540 

541 where = BinaryOperation(op="=", args=[Identifier("project"), Constant(db_name)]) 

542 

543 select_statement = Select( 

544 targets=[Star()], 

545 from_table=Identifier(parts=["information_schema", sql_category]), 

546 where=_get_show_where(statement, like_name="name", initial=where), 

547 ) 

548 query = SQLQuery(select_statement, session=self.session) 

549 return self.answer_select(query) 

550 

551 elif sql_category == "projects": 

552 where = BinaryOperation(op="=", args=[Identifier("type"), Constant("project")]) 

553 select_statement = Select( 

554 targets=[Identifier(parts=["NAME"], alias=Identifier("project"))], 

555 from_table=Identifier(parts=["information_schema", "DATABASES"]), 

556 where=_get_show_where(statement, like_name="project", from_name="project", initial=where), 

557 ) 

558 

559 query = SQLQuery(select_statement, session=self.session) 

560 return self.answer_select(query) 

561 else: 

562 raise NotSupportedYet(f"Statement not implemented: {sql}") 

563 elif statement_type in ( 563 ↛ 568line 563 didn't jump to line 568 because the condition on line 563 was never true

564 StartTransaction, 

565 CommitTransaction, 

566 RollbackTransaction, 

567 ): 

568 return ExecuteAnswer() 

569 elif statement_type is Set: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 category = (statement.category or "").lower() 

571 if category == "": 

572 if isinstance(statement.name, Identifier): 

573 param = statement.name.parts[0].lower() 

574 

575 value = None 

576 if isinstance(statement.value, Constant): 

577 value = statement.value.value 

578 

579 if param == "profiling": 

580 self.session.profiling = value in (1, True) 

581 if self.session.profiling is True: 

582 profiler.enable() 

583 else: 

584 profiler.disable() 

585 elif param == "predictor_cache": 

586 self.session.predictor_cache = value in (1, True) 

587 elif param == "context": 

588 if value in (0, False, None): 

589 # drop context 

590 query_context_controller.drop_query_context(None) 

591 elif param == "show_secrets": 

592 self.session.show_secrets = value in (1, True) 

593 elif isinstance(statement.name, Variable): 

594 variables_controller.set_variable(statement.name.value, statement.value) 

595 return ExecuteAnswer() 

596 elif category == "autocommit": 

597 return ExecuteAnswer() 

598 elif category == "names": 

599 # set names utf8; 

600 charsets = { 

601 "utf8": CHARSET_NUMBERS["utf8_general_ci"], 

602 "utf8mb4": CHARSET_NUMBERS["utf8mb4_general_ci"], 

603 } 

604 self.charset = statement.value.value 

605 self.charset_text_type = charsets.get(self.charset) 

606 if self.charset_text_type is None: 

607 logger.warning( 

608 f"Unknown charset: {self.charset}. Setting up 'utf8_general_ci' as charset text type." 

609 ) 

610 self.charset_text_type = CHARSET_NUMBERS["utf8_general_ci"] 

611 return ExecuteAnswer( 

612 state_track=[ 

613 ["character_set_client", self.charset], 

614 ["character_set_connection", self.charset], 

615 ["character_set_results", self.charset], 

616 ], 

617 ) 

618 elif category == "active": 

619 return self.answer_update_model_version(statement.value, database_name) 

620 

621 else: 

622 logger.warning(f"SQL statement is not processable, return OK package: {sql}") 

623 return ExecuteAnswer() 

624 elif statement_type is Use: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 db_name = statement.value.parts[-1] 

626 self.change_default_db(db_name) 

627 return ExecuteAnswer() 

628 elif statement_type in ( 628 ↛ 632line 628 didn't jump to line 632 because the condition on line 628 was never true

629 CreatePredictor, 

630 CreateAnomalyDetectionModel, # we may want to specialize these in the future 

631 ): 

632 return self.answer_create_predictor(statement, database_name) 

633 elif statement_type is CreateView: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true

634 return self.answer_create_or_alter_view(statement, database_name) 

635 elif statement_type is AlterView: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 return self.answer_create_or_alter_view(statement, database_name) 

637 elif statement_type is DropView: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 return self.answer_drop_view(statement, database_name) 

639 elif statement_type is Delete: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 query = SQLQuery(statement, session=self.session, database=database_name) 

641 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows) 

642 elif statement_type is Insert: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 query = SQLQuery(statement, session=self.session, database=database_name) 

644 if query.fetched_data.length() > 0: 

645 return self.answer_select(query) 

646 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows) 

647 elif statement_type is Update: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 query = SQLQuery(statement, session=self.session, database=database_name) 

649 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows) 

650 elif statement_type is Alter and ("disable keys" in sql_lower) or ("enable keys" in sql_lower): 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 return ExecuteAnswer() 

652 elif statement_type is Select: 

653 ret = self.exec_service_function(statement, database_name) 

654 if ret is not None: 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true

655 return ret 

656 query = SQLQuery(statement, session=self.session, database=database_name) 

657 return self.answer_select(query) 

658 elif statement_type is Explain: 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true

659 return self.answer_show_columns(statement.target, database_name=database_name) 

660 elif statement_type is CreateTable: 660 ↛ 663line 660 didn't jump to line 663 because the condition on line 660 was always true

661 return self.answer_create_table(statement, database_name) 

662 # -- jobs -- 

663 elif statement_type is CreateJob: 

664 return self.answer_create_job(statement, database_name) 

665 elif statement_type is DropJob: 

666 return self.answer_drop_job(statement, database_name) 

667 # -- triggers -- 

668 elif statement_type is CreateTrigger: 

669 return self.answer_create_trigger(statement, database_name) 

670 elif statement_type is DropTrigger: 

671 return self.answer_drop_trigger(statement, database_name) 

672 # -- chatbots 

673 elif statement_type is CreateChatBot: 

674 return self.answer_create_chatbot(statement, database_name) 

675 elif statement_type is UpdateChatBot: 

676 return self.answer_update_chatbot(statement, database_name) 

677 elif statement_type is DropChatBot: 

678 return self.answer_drop_chatbot(statement, database_name) 

679 elif statement_type is CreateKnowledgeBase: 

680 return self.answer_create_kb(statement, database_name) 

681 elif statement_type is AlterKnowledgeBase: 

682 return self.answer_alter_kb(statement, database_name) 

683 elif statement_type is DropKnowledgeBase: 

684 return self.answer_drop_kb(statement, database_name) 

685 elif statement_type is CreateSkill: 

686 return self.answer_create_skill(statement, database_name) 

687 elif statement_type is DropSkill: 

688 return self.answer_drop_skill(statement, database_name) 

689 elif statement_type is UpdateSkill: 

690 return self.answer_update_skill(statement, database_name) 

691 elif statement_type is CreateAgent: 

692 return self.answer_create_agent(statement, database_name) 

693 elif statement_type is DropAgent: 

694 return self.answer_drop_agent(statement, database_name) 

695 elif statement_type is UpdateAgent: 

696 return self.answer_update_agent(statement, database_name) 

697 elif statement_type is Evaluate: 

698 statement.data = parse_sql(statement.query_str) 

699 return self.answer_evaluate_metric(statement, database_name) 

700 elif statement_type is CreateKnowledgeBaseIndex: 

701 return self.answer_create_kb_index(statement, database_name) 

702 elif statement_type is EvaluateKnowledgeBase: 

703 return self.answer_evaluate_kb(statement, database_name) 

704 elif statement_type in (Union, Intersect, Except): 

705 query = SQLQuery(statement, session=self.session, database=database_name) 

706 return self.answer_select(query) 

707 else: 

708 logger.warning(f"Unknown SQL statement: {sql}") 

709 raise NotSupportedYet(f"Unknown SQL statement: {sql}") 

710 

711 def exec_service_function(self, statement: Select, database_name: str) -> Optional[ExecuteAnswer]: 

712 """ 

713 If input query is a single line select without FROM 

714 and has function in targets that matches with one of the mindsdb service functions: 

715 - execute this function and return response 

716 Otherwise, return None to allow to continue execution query outside 

717 """ 

718 

719 if statement.from_table is not None or len(statement.targets) != 1: 719 ↛ 722line 719 didn't jump to line 722 because the condition on line 719 was always true

720 return 

721 

722 target = statement.targets[0] 

723 if not isinstance(target, Function): 

724 return 

725 

726 command = target.op.lower() 

727 args = [arg.value for arg in target.args if isinstance(arg, Constant)] 

728 if command == "query_resume": 

729 ret = SQLQuery(None, session=self.session, query_id=args[0]) 

730 return self.answer_select(ret) 

731 

732 elif command == "query_cancel": 

733 query_context_controller.cancel_query(*args) 

734 return ExecuteAnswer() 

735 

736 def answer_create_trigger(self, statement, database_name): 

737 triggers_controller = TriggersController() 

738 project_name, trigger_name = match_two_part_name(statement.name, default_db_name=database_name) 

739 

740 triggers_controller.add( 

741 trigger_name, 

742 project_name, 

743 statement.table, 

744 statement.query_str, 

745 statement.columns, 

746 ) 

747 return ExecuteAnswer() 

748 

749 def answer_drop_trigger(self, statement, database_name): 

750 triggers_controller = TriggersController() 

751 

752 project_name, trigger_name = match_two_part_name(statement.name, default_db_name=database_name) 

753 

754 triggers_controller.delete(trigger_name, project_name) 

755 

756 return ExecuteAnswer() 

757 

758 def answer_create_job(self, statement: CreateJob, database_name): 

759 jobs_controller = JobsController() 

760 project_name, job_name = match_two_part_name(statement.name, default_db_name=database_name) 

761 

762 try: 

763 jobs_controller.create(job_name, project_name, statement) 

764 except EntityExistsError: 

765 if getattr(statement, "if_not_exists", False) is False: 

766 raise 

767 

768 return ExecuteAnswer() 

769 

770 def answer_drop_job(self, statement, database_name): 

771 jobs_controller = JobsController() 

772 project_name, job_name = match_two_part_name(statement.name, default_db_name=database_name) 

773 

774 try: 

775 jobs_controller.delete(job_name, project_name) 

776 except EntityNotExistsError: 

777 if statement.if_exists is False: 

778 raise 

779 

780 return ExecuteAnswer() 

781 

782 def answer_create_chatbot(self, statement, database_name): 

783 chatbot_controller = ChatBotController() 

784 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

785 

786 is_running = statement.params.pop("is_running", True) 

787 

788 database = self.session.integration_controller.get(statement.database.parts[-1]) 

789 if database is None: 

790 raise ExecutorException(f"Database not found: {statement.database}") 

791 

792 # Database ID cannot be null 

793 database_id = database["id"] if database is not None else -1 

794 

795 model_name = None 

796 if statement.model is not None: 

797 model_name = statement.model.parts[-1] 

798 

799 agent_name = None 

800 if statement.agent is not None: 

801 agent_name = statement.agent.parts[-1] 

802 chatbot_controller.add_chatbot( 

803 name, 

804 project_name=project_name, 

805 model_name=model_name, 

806 agent_name=agent_name, 

807 database_id=database_id, 

808 is_running=is_running, 

809 params=variables_controller.fill_parameters(statement.params), 

810 ) 

811 return ExecuteAnswer() 

812 

813 def answer_update_chatbot(self, statement, database_name): 

814 chatbot_controller = ChatBotController() 

815 

816 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

817 

818 # From SET keyword parameters 

819 updated_name = statement.params.pop("name", None) 

820 model_name = statement.params.pop("model", None) 

821 agent_name = statement.params.pop("agent", None) 

822 database_name = statement.params.pop("database", None) 

823 is_running = statement.params.pop("is_running", None) 

824 

825 database_id = None 

826 if database_name is not None: 

827 database = self.session.integration_controller.get(database_name) 

828 if database is None: 

829 raise ExecutorException(f"Database with name {database_name} not found") 

830 database_id = database["id"] 

831 

832 updated_chatbot = chatbot_controller.update_chatbot( 

833 name, 

834 project_name=project_name, 

835 name=updated_name, 

836 model_name=model_name, 

837 agent_name=agent_name, 

838 database_id=database_id, 

839 is_running=is_running, 

840 params=variables_controller.fill_parameters(statement.params), 

841 ) 

842 if updated_chatbot is None: 

843 raise ExecutorException(f"Chatbot with name {name} not found") 

844 return ExecuteAnswer() 

845 

846 def answer_drop_chatbot(self, statement, database_name): 

847 chatbot_controller = ChatBotController() 

848 

849 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

850 

851 chatbot_controller.delete_chatbot(name, project_name=project_name) 

852 return ExecuteAnswer() 

853 

854 def answer_evaluate_metric(self, statement, database_name): 

855 # heavy import, so we do it here on-demand 

856 try: 

857 from mindsdb_evaluator.accuracy.general import evaluate_accuracy 

858 except ImportError: 

859 logger.error("mindsdb-evaluator is not installed. Please install it with `pip install mindsdb-evaluator]`.") 

860 

861 try: 

862 sqlquery = SQLQuery(statement.data, session=self.session, database=database_name) 

863 except Exception as e: 

864 raise Exception(f'Nested query failed to execute with error: "{e}", please check and try again.') from e 

865 df = sqlquery.fetched_data.to_df() 

866 df.columns = [str(t.alias) if hasattr(t, "alias") else str(t.parts[-1]) for t in statement.data.targets] 

867 

868 for col in ["actual", "prediction"]: 

869 assert col in df.columns, f"`{col}` column was not provided, please try again." 

870 assert df[col].isna().sum() == 0, f"There are missing values in the `{col}` column, please try again." 

871 

872 metric_name = statement.name.parts[-1] 

873 target_series = df.pop("prediction") 

874 using_clause = statement.using if statement.using is not None else {} 

875 metric_value = evaluate_accuracy( 

876 df, 

877 target_series, 

878 metric_name, 

879 target="actual", 

880 ts_analysis=using_clause.get("ts_analysis", {}), # will be deprecated soon 

881 n_decimals=using_clause.get("n_decimals", 3), 

882 ) # 3 decimals by default 

883 return ExecuteAnswer( 

884 data=ResultSet( 

885 columns=[Column(name=metric_name, table_name="", type="str")], 

886 values=[[metric_value]], 

887 ) 

888 ) 

889 

890 def answer_describe_object(self, obj_type: str, obj_name: Identifier, database_name: str): 

891 project_objects = ( 

892 "AGENTS", 

893 "JOBS", 

894 "SKILLS", 

895 "CHATBOTS", 

896 "TRIGGERS", 

897 "VIEWS", 

898 "KNOWLEDGE_BASES", 

899 "PREDICTORS", 

900 "MODELS", 

901 ) 

902 

903 global_objects = ("DATABASES", "PROJECTS", "HANDLERS", "ML_ENGINES") 

904 

905 all_objects = project_objects + global_objects 

906 

907 # is not plural? 

908 if obj_type not in all_objects: 

909 if obj_type + "S" in all_objects: 

910 obj_type = obj_type + "S" 

911 elif obj_type + "ES" in all_objects: 

912 obj_type = obj_type + "ES" 

913 else: 

914 raise WrongArgumentError(f"Unknown describe type: {obj_type}") 

915 

916 parts = obj_name.parts 

917 if len(parts) > 2: 

918 raise WrongArgumentError( 

919 f"Invalid object name: {obj_name.to_string()}.\nOnly models support three-part namespaces." 

920 ) 

921 

922 name = parts[-1] 

923 where = BinaryOperation(op="=", args=[Identifier("name"), Constant(name)]) 

924 

925 if obj_type in project_objects: 

926 database_name = parts[0] if len(parts) > 1 else database_name 

927 where = BinaryOperation( 

928 op="and", args=[where, BinaryOperation(op="=", args=[Identifier("project"), Constant(database_name)])] 

929 ) 

930 

931 select_statement = Select( 

932 targets=[Star()], 

933 from_table=Identifier(parts=["information_schema", obj_type]), 

934 where=where, 

935 ) 

936 query = SQLQuery(select_statement, session=self.session) 

937 return self.answer_select(query) 

938 

939 def answer_describe_predictor(self, obj_name, database_name): 

940 value = obj_name.parts.copy() 

941 # project.model.version.?attrs 

942 parts = value[:3] 

943 attrs = value[3:] 

944 model_info = self._get_model_info(Identifier(parts=parts), except_absent=False, database_name=database_name) 

945 if model_info is None: 

946 # project.model.?attrs 

947 parts = value[:2] 

948 attrs = value[2:] 

949 model_info = self._get_model_info(Identifier(parts=parts), except_absent=False, database_name=database_name) 

950 if model_info is None: 

951 # model.?attrs 

952 parts = value[:1] 

953 attrs = value[1:] 

954 model_info = self._get_model_info( 

955 Identifier(parts=parts), except_absent=False, database_name=database_name 

956 ) 

957 

958 if model_info is None: 

959 raise ExecutorException(f"Model not found: {obj_name}") 

960 

961 if len(attrs) == 1: 

962 attrs = attrs[0] 

963 elif len(attrs) == 0: 

964 attrs = None 

965 

966 df = self.session.model_controller.describe_model( 

967 self.session, 

968 model_info["project_name"], 

969 model_info["model_record"].name, 

970 attribute=attrs, 

971 version=model_info["model_record"].version, 

972 ) 

973 

974 return ExecuteAnswer(data=ResultSet.from_df(df, table_name="")) 

975 

976 def answer_create_kb_index(self, statement, database_name): 

977 project_name, table_name = match_two_part_name(statement.name, default_db_name=database_name) 

978 self.session.kb_controller.create_index(table_name=table_name, project_name=project_name) 

979 return ExecuteAnswer() 

980 

981 def answer_evaluate_kb(self, statement: EvaluateKnowledgeBase, database_name): 

982 project_name, table_name = match_two_part_name(statement.name, default_db_name=database_name) 

983 scores = self.session.kb_controller.evaluate( 

984 table_name=table_name, project_name=project_name, params=statement.params 

985 ) 

986 return ExecuteAnswer(data=ResultSet.from_df(scores)) 

987 

988 def _get_model_info(self, identifier, except_absent=True, database_name=None): 

989 if len(identifier.parts) == 1: 

990 identifier.parts = [database_name, identifier.parts[0]] 

991 identifier.is_quoted = [False] + identifier.is_quoted 

992 

993 database_name, model_name, model_version = resolve_model_identifier(identifier) 

994 # at least two part in identifier 

995 identifier.parts[0] = database_name 

996 identifier.parts[1] = model_name 

997 

998 if database_name is None: 

999 database_name = database_name 

1000 

1001 if model_name is None: 

1002 if except_absent: 

1003 raise Exception(f"Model not found: {identifier.to_string()}") 

1004 else: 

1005 return 

1006 

1007 model_record = get_model_record( 

1008 name=model_name, 

1009 project_name=database_name, 

1010 except_absent=except_absent, 

1011 version=model_version, 

1012 active=True if model_version is None else None, 

1013 ) 

1014 if not model_record: 

1015 return None 

1016 return {"model_record": model_record, "project_name": database_name} 

1017 

1018 def _sync_predictor_check(self, phase_name): 

1019 """Checks if there is already a predictor retraining or fine-tuning 

1020 Do not allow to run retrain if there is another model in training process in less that 1h 

1021 """ 

1022 if ctx.company_id is None: 

1023 # bypass for tests 

1024 return 

1025 is_cloud = self.session.config.get("cloud", False) 

1026 if is_cloud and ctx.user_class == 0: 

1027 models = get_model_records(active=None) 

1028 shortest_training = None 

1029 for model in models: 

1030 if ( 

1031 model.status in (PREDICTOR_STATUS.GENERATING, PREDICTOR_STATUS.TRAINING) 

1032 and model.training_start_at is not None 

1033 and model.training_stop_at is None 

1034 ): 

1035 training_time = datetime.datetime.now() - model.training_start_at 

1036 if shortest_training is None or training_time < shortest_training: 

1037 shortest_training = training_time 

1038 

1039 if shortest_training is not None and shortest_training < datetime.timedelta(hours=1): 

1040 raise ExecutorException( 

1041 f"Can't start {phase_name} process while any other predictor is in status 'training' or 'generating'" 

1042 ) 

1043 

1044 def answer_retrain_predictor(self, statement, database_name): 

1045 model_record = self._get_model_info(statement.name, database_name=database_name)["model_record"] 

1046 

1047 if statement.query_str is None: 

1048 if model_record.data_integration_ref is not None: 

1049 if model_record.data_integration_ref["type"] == "integration": 

1050 integration = self.session.integration_controller.get_by_id(model_record.data_integration_ref["id"]) 

1051 if integration is None: 

1052 raise EntityNotExistsError("The database from which the model was trained no longer exists") 

1053 elif statement.integration_name is None: 

1054 # set to current project 

1055 statement.integration_name = Identifier(database_name) 

1056 

1057 ml_handler = None 

1058 if statement.using is not None: 

1059 # repack using with lower names 

1060 statement.using = {k.lower(): v for k, v in statement.using.items()} 

1061 

1062 if "engine" in statement.using: 

1063 ml_integration_name = statement.using.pop("engine") 

1064 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name) 

1065 

1066 # use current ml handler 

1067 if ml_handler is None: 

1068 integration_record = get_predictor_integration(model_record) 

1069 if integration_record is None: 

1070 raise EntityNotExistsError("ML engine model was trained with does not esxists") 

1071 ml_handler = self.session.integration_controller.get_ml_handler(integration_record.name) 

1072 

1073 self._sync_predictor_check(phase_name="retrain") 

1074 df = self.session.model_controller.retrain_model(statement, ml_handler) 

1075 

1076 return ExecuteAnswer(data=ResultSet.from_df(df)) 

1077 

1078 @profiler.profile() 

1079 @mark_process("learn") 

1080 def answer_finetune_predictor(self, statement, database_name): 

1081 model_record = self._get_model_info(statement.name, database_name=database_name)["model_record"] 

1082 

1083 if statement.using is not None: 

1084 # repack using with lower names 

1085 statement.using = {k.lower(): v for k, v in statement.using.items()} 

1086 

1087 if statement.query_str is not None and statement.integration_name is None: 

1088 # set to current project 

1089 statement.integration_name = Identifier(database_name) 

1090 

1091 # use current ml handler 

1092 integration_record = get_predictor_integration(model_record) 

1093 if integration_record is None: 

1094 raise Exception("The ML engine that the model was trained with does not exist.") 

1095 ml_handler = self.session.integration_controller.get_ml_handler(integration_record.name) 

1096 

1097 self._sync_predictor_check(phase_name="finetune") 

1098 df = self.session.model_controller.finetune_model(statement, ml_handler) 

1099 

1100 return ExecuteAnswer(data=ResultSet.from_df(df)) 

1101 

1102 def _create_integration(self, name: str, engine: str, connection_args: dict): 

1103 # we have connection checkers not for any db. So do nothing if fail 

1104 # TODO return rich error message 

1105 

1106 if connection_args is None: 1106 ↛ 1107line 1106 didn't jump to line 1107 because the condition on line 1106 was never true

1107 connection_args = {} 

1108 status = HandlerStatusResponse(success=False) 

1109 

1110 storage = None 

1111 try: 

1112 handler_meta = self.session.integration_controller.get_handler_meta(engine) 

1113 if handler_meta is None: 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true

1114 raise ExecutorException(f"There is no engine '{engine}'") 

1115 

1116 if handler_meta.get("import", {}).get("success") is not True: 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true

1117 raise ExecutorException( 

1118 f"The '{engine}' handler isn't installed.\n" + get_handler_install_message(engine) 

1119 ) 

1120 

1121 accept_connection_args = handler_meta.get("connection_args") 

1122 if accept_connection_args is not None and connection_args is not None: 1122 ↛ 1123line 1122 didn't jump to line 1123 because the condition on line 1122 was never true

1123 for arg_name, arg_value in connection_args.items(): 

1124 if arg_name not in accept_connection_args: 

1125 continue 

1126 arg_meta = accept_connection_args[arg_name] 

1127 arg_type = arg_meta.get("type") 

1128 if arg_type == HANDLER_CONNECTION_ARG_TYPE.PATH: 

1129 # arg may be one of: 

1130 # str: '/home/file.pem' 

1131 # dict: {'path': '/home/file.pem'} 

1132 # dict: {'url': 'https://host.com/file'} 

1133 arg_value = connection_args[arg_name] 

1134 if isinstance(arg_value, (str, dict)) is False: 

1135 raise ExecutorException(f"Unknown type of arg: '{arg_value}'") 

1136 if isinstance(arg_value, str) or "path" in arg_value: 

1137 path = arg_value if isinstance(arg_value, str) else arg_value["path"] 

1138 if Path(path).is_file() is False: 

1139 raise ExecutorException(f"File not found at: '{path}'") 

1140 elif "url" in arg_value: 

1141 path = download_file(arg_value["url"]) 

1142 else: 

1143 raise ExecutorException(f"Argument '{arg_name}' must be path or url to the file") 

1144 connection_args[arg_name] = path 

1145 

1146 handler = self.session.integration_controller.create_tmp_handler( 

1147 name=name, engine=engine, connection_args=connection_args 

1148 ) 

1149 status = handler.check_connection() 

1150 if status.copy_storage: 1150 ↛ 1151line 1150 didn't jump to line 1151 because the condition on line 1150 was never true

1151 storage = handler.handler_storage.export_files() 

1152 except Exception as e: 

1153 status.error_message = str(e) 

1154 

1155 if status.success is False: 1155 ↛ 1156line 1155 didn't jump to line 1156 because the condition on line 1155 was never true

1156 raise ExecutorException(f"Can't connect to db: {status.error_message}") 

1157 

1158 integration = self.session.integration_controller.get(name) 

1159 if integration is not None: 1159 ↛ 1160line 1159 didn't jump to line 1160 because the condition on line 1159 was never true

1160 raise EntityExistsError("Database already exists", name) 

1161 try: 

1162 integration = ProjectController().get(name=name) 

1163 except EntityNotExistsError: 

1164 pass 

1165 if integration is not None: 1165 ↛ 1166line 1165 didn't jump to line 1166 because the condition on line 1165 was never true

1166 raise EntityExistsError("Project exists with this name", name) 

1167 

1168 self.session.integration_controller.add(name, engine, connection_args) 

1169 if storage: 1169 ↛ 1170line 1169 didn't jump to line 1170 because the condition on line 1169 was never true

1170 handler = self.session.integration_controller.get_data_handler(name, connect=False) 

1171 handler.handler_storage.import_files(storage) 

1172 

1173 def answer_create_ml_engine(self, statement: CreateMLEngine) -> ExecuteAnswer: 

1174 """Handles the `CREATE ML_ENGINE` command, which creates a new ML integration (engine) in the system. 

1175 

1176 Args: 

1177 statement (CreateMLEngine): The AST object representing the CREATE ML_ENGINE command. 

1178 

1179 Returns: 

1180 ExecuteAnswer: The result of the ML engine creation operation. 

1181 

1182 Raises: 

1183 ValueError: If the ml_engine name format is invalid. 

1184 """ 

1185 name = match_one_part_name(statement.name) 

1186 

1187 handler = statement.handler 

1188 params = statement.params 

1189 if_not_exists = getattr(statement, "if_not_exists", False) 

1190 

1191 integrations = self.session.integration_controller.get_all() 

1192 if name in integrations: 

1193 if not if_not_exists: 1193 ↛ 1196line 1193 didn't jump to line 1196 because the condition on line 1193 was always true

1194 raise EntityExistsError("Integration already exists", name) 

1195 else: 

1196 return ExecuteAnswer() 

1197 

1198 handler_module_meta = self.session.integration_controller.get_handler_meta(handler) 

1199 

1200 if handler_module_meta is None: 1200 ↛ 1201line 1200 didn't jump to line 1201 because the condition on line 1200 was never true

1201 raise ExecutorException(f"There is no engine '{handler}'") 

1202 

1203 params_out = {} 

1204 if params: 

1205 for key, value in variables_controller.fill_parameters(params).items(): 

1206 # convert ast types to string 

1207 if isinstance(value, (Constant, Identifier)): 1207 ↛ 1208line 1207 didn't jump to line 1208 because the condition on line 1207 was never true

1208 value = value.to_string() 

1209 params_out[key] = value 

1210 

1211 try: 

1212 self.session.integration_controller.add(name=name, engine=handler, connection_args=params_out) 

1213 except Exception as e: 

1214 msg = str(e) 

1215 if type(e) in (ImportError, ModuleNotFoundError): 

1216 msg = dedent( 

1217 f"""\ 

1218 The '{handler_module_meta["name"]}' handler cannot be used. Reason is: 

1219 {handler_module_meta["import"]["error_message"] or msg} 

1220 """ 

1221 ) 

1222 is_cloud = self.session.config.get("cloud", False) 

1223 if ( 

1224 is_cloud is False 

1225 # NOTE: BYOM may raise these errors if there is an error in the user's code, 

1226 # therefore error_message will be None 

1227 and handler_module_meta["name"] != "byom" 

1228 and "No module named" in handler_module_meta["import"]["error_message"] 

1229 ): 

1230 logger.info(get_handler_install_message(handler_module_meta["name"])) 

1231 ast_drop = DropMLEngine(name=Identifier(name)) 

1232 self.answer_drop_ml_engine(ast_drop) 

1233 logger.info(msg) 

1234 raise ExecutorException(msg) from e 

1235 

1236 return ExecuteAnswer() 

1237 

1238 def answer_drop_ml_engine(self, statement: DropMLEngine) -> ExecuteAnswer: 

1239 """Handles the `DROP ML_ENGINE` command, which removes an ML integration (engine) from the system. 

1240 

1241 Args: 

1242 statement (DropMLEngine): The AST object representing the DROP ML_ENGINE command. 

1243 

1244 Raises: 

1245 EntityNotExistsError: If the integration does not exist and IF EXISTS is not specified. 

1246 ValueError: If the integration name is provided in an invalid format. 

1247 

1248 Returns: 

1249 ExecuteAnswer: The result of the ML engine deletion operation. 

1250 """ 

1251 name = match_one_part_name(statement.name) 

1252 

1253 integrations = self.session.integration_controller.get_all() 

1254 if name not in integrations: 

1255 if not statement.if_exists: 

1256 raise EntityNotExistsError("Integration does not exists", name) 

1257 else: 

1258 return ExecuteAnswer() 

1259 self.session.integration_controller.delete(name) 

1260 return ExecuteAnswer() 

1261 

1262 def answer_create_database(self, statement: CreateDatabase) -> ExecuteAnswer: 

1263 """Create new integration or project 

1264 

1265 Args: 

1266 statement (CreateDatabase): data for creating database/project 

1267 

1268 Returns: 

1269 ExecuteAnswer: 'ok' answer 

1270 """ 

1271 database_name = match_one_part_name(statement.name) 

1272 

1273 engine = (statement.engine or "mindsdb").lower() 

1274 

1275 connection_args = variables_controller.fill_parameters(statement.parameters) 

1276 

1277 try: 

1278 if engine == "mindsdb": 1278 ↛ 1279line 1278 didn't jump to line 1279 because the condition on line 1278 was never true

1279 ProjectController().add(database_name) 

1280 else: 

1281 self._create_integration(database_name, engine, connection_args) 

1282 except EntityExistsError: 

1283 if statement.if_not_exists is False: 

1284 raise 

1285 

1286 return ExecuteAnswer() 

1287 

1288 def answer_drop_database(self, statement: DropDatabase | DropDatasource) -> ExecuteAnswer: 

1289 """Drop a database (project or integration) by name. 

1290 

1291 Args: 

1292 statement (DropDatabase | DropDatasource): The parsed DROP DATABASE or DROP DATASOURCE statement. 

1293 

1294 Raises: 

1295 Exception: If the database name format is invalid. 

1296 EntityNotExistsError: If the database does not exist and 'IF EXISTS' is not specified in the statement. 

1297 

1298 Returns: 

1299 ExecuteAnswer: The result of the drop database operation. 

1300 """ 

1301 db_name = match_one_part_name(statement.name) 

1302 

1303 try: 

1304 self.session.database_controller.delete(db_name, strict_case=statement.name.is_quoted[0]) 

1305 except EntityNotExistsError: 

1306 if statement.if_exists is not True: 

1307 raise 

1308 return ExecuteAnswer() 

1309 

1310 def answer_alter_database(self, statement: AlterDatabase) -> ExecuteAnswer: 

1311 db_name = match_one_part_name(statement.name) 

1312 self.session.database_controller.update( 

1313 db_name, data=statement.params, strict_case=statement.name.is_quoted[0], check_connection=True 

1314 ) 

1315 return ExecuteAnswer() 

1316 

1317 def answer_drop_tables(self, statement, database_name): 

1318 """answer on 'drop table [if exists] {name}' 

1319 Args: 

1320 statement: ast 

1321 """ 

1322 

1323 for table in statement.tables: 

1324 if len(table.parts) > 1: 

1325 db_name = table.parts[0] 

1326 table = Identifier(parts=table.parts[1:]) 

1327 else: 

1328 db_name = database_name 

1329 

1330 dn = self.session.datahub[db_name] 

1331 if dn is None: 

1332 raise ExecutorException(f"Cannot delete a table from database '{db_name}': the database does not exist") 

1333 

1334 if db_name is not None: 

1335 dn.drop_table(table, if_exists=statement.if_exists) 

1336 elif db_name in self.session.database_controller.get_dict(filter_type="project"): 

1337 # TODO do we need feature: delete object from project via drop table? 

1338 

1339 project = self.session.database_controller.get_project(db_name) 

1340 project_tables = {key: val for key, val in project.get_tables().items() if val.get("deletable") is True} 

1341 table_name = table.to_string() 

1342 

1343 if table_name in project_tables: 

1344 self.session.model_controller.delete_model(table_name, project_name=db_name) 

1345 elif statement.if_exists is False: 

1346 raise ExecutorException(f"Cannot delete a table from database '{db_name}': table does not exists") 

1347 else: 

1348 raise ExecutorException(f"Cannot delete a table from database '{db_name}'") 

1349 

1350 return ExecuteAnswer() 

1351 

1352 def answer_create_or_alter_view(self, statement: CreateView | AlterView, database_name: str) -> ExecuteAnswer: 

1353 """Process CREATE and ALTER VIEW commands 

1354 

1355 Args: 

1356 statement (CreateView | AlterView): data for creating or altering view 

1357 database_name (str): name of the current database 

1358 

1359 Returns: 

1360 ExecuteAnswer: answer for the command 

1361 """ 

1362 project_name, view_name = match_two_part_name(statement.name, default_db_name=database_name) 

1363 

1364 query_str = statement.query_str 

1365 

1366 if isinstance(statement.from_table, Identifier): 

1367 query = Select( 

1368 targets=[Star()], 

1369 from_table=NativeQuery(integration=statement.from_table, query=statement.query_str), 

1370 ) 

1371 query_str = query.to_string() 

1372 

1373 project = self.session.database_controller.get_project(project_name) 

1374 

1375 if isinstance(statement, CreateView): 

1376 try: 

1377 project.create_view(view_name, query=query_str, session=self.session) 

1378 except EntityExistsError: 

1379 if getattr(statement, "if_not_exists", False) is False: 

1380 raise 

1381 elif isinstance(statement, AlterView): 

1382 try: 

1383 project.update_view(view_name, query=query_str, strict_case=(not view_name.islower())) 

1384 except EntityNotExistsError: 

1385 raise ExecutorException(f"View {view_name} does not exist in {project_name}") 

1386 else: 

1387 raise ValueError(f"Unknown view DDL statement: {statement}") 

1388 

1389 return ExecuteAnswer() 

1390 

1391 def answer_drop_view(self, statement: DropView, database_name: str) -> ExecuteAnswer: 

1392 """Drop one or more views from the specified database/project. 

1393 

1394 Args: 

1395 statement (DropView): The parsed DROP VIEW statement containing view names and options. 

1396 database_name (str): The name of the database (project) from which to drop the views. 

1397 

1398 Raises: 

1399 EntityNotExistsError: If a view does not exist and 'IF EXISTS' is not specified in the statement. 

1400 ValueError: If the view name format is invalid. 

1401 

1402 Returns: 

1403 ExecuteAnswer: The result of the drop view operation. 

1404 """ 

1405 for name in statement.names: 

1406 match name.parts, name.is_quoted: 

1407 case [view_name], [view_name_quoted]: 

1408 db_name_quoted = False 

1409 case [database_name, view_name], [db_name_quoted, view_name_quoted]: 

1410 pass 

1411 case _: 

1412 raise ValueError(f"Invalid view name: {name}") 

1413 

1414 if not db_name_quoted: 

1415 database_name = database_name.lower() 

1416 if not view_name_quoted: 

1417 view_name = view_name.lower() 

1418 

1419 project = self.session.database_controller.get_project(database_name, db_name_quoted) 

1420 

1421 try: 

1422 project.drop_view(view_name, strict_case=True) 

1423 except EntityNotExistsError: 

1424 if statement.if_exists is not True: 

1425 raise 

1426 

1427 return ExecuteAnswer() 

1428 

1429 def answer_create_kb(self, statement: CreateKnowledgeBase, database_name: str): 

1430 if statement.model: 

1431 raise ExecutorException( 

1432 "Creating a knowledge base using pre-existing models is no longer supported.\n" 

1433 "Please pass the model parameters as a JSON object in the embedding_model field." 

1434 ) 

1435 

1436 project_name, kb_name = match_two_part_name(statement.name, default_db_name=database_name) 

1437 

1438 if statement.storage is not None: 

1439 if len(statement.storage.parts) != 2: 

1440 raise ExecutorException( 

1441 f"Invalid vectordatabase table name: {statement.storage}Need the form 'database_name.table_name'" 

1442 ) 

1443 

1444 if statement.from_query is not None: 

1445 # TODO: implement this 

1446 raise ExecutorException("Create a knowledge base from a select is not supported yet") 

1447 

1448 # create the knowledge base 

1449 _ = self.session.kb_controller.add( 

1450 name=kb_name, 

1451 project_name=project_name, 

1452 # embedding_model=statement.model, 

1453 storage=statement.storage, 

1454 params=variables_controller.fill_parameters(statement.params), 

1455 if_not_exists=statement.if_not_exists, 

1456 ) 

1457 

1458 return ExecuteAnswer() 

1459 

1460 def answer_alter_kb(self, statement: AlterKnowledgeBase, database_name: str): 

1461 project_name, kb_name = match_two_part_name( 

1462 statement.name, ensure_lower_case=True, default_db_name=database_name 

1463 ) 

1464 

1465 # update the knowledge base 

1466 self.session.kb_controller.update( 

1467 name=kb_name, 

1468 project_name=project_name, 

1469 params=variables_controller.fill_parameters(statement.params), 

1470 ) 

1471 

1472 return ExecuteAnswer() 

1473 

1474 def answer_drop_kb(self, statement: DropKnowledgeBase, database_name: str) -> ExecuteAnswer: 

1475 project_name, kb_name = match_two_part_name(statement.name, default_db_name=database_name) 

1476 

1477 # delete the knowledge base 

1478 self.session.kb_controller.delete( 

1479 name=kb_name, 

1480 project_name=project_name, 

1481 if_exists=statement.if_exists, 

1482 ) 

1483 

1484 return ExecuteAnswer() 

1485 

1486 def answer_create_skill(self, statement, database_name): 

1487 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1488 

1489 try: 

1490 _ = self.session.skills_controller.add_skill(name, project_name, statement.type, statement.params) 

1491 except ValueError as e: 

1492 # Project does not exist or skill already exists. 

1493 raise ExecutorException(str(e)) 

1494 

1495 return ExecuteAnswer() 

1496 

1497 def answer_drop_skill(self, statement, database_name): 

1498 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1499 

1500 try: 

1501 self.session.skills_controller.delete_skill(name, project_name, strict_case=True) 

1502 except ValueError as e: 

1503 # Project does not exist or skill does not exist. 

1504 raise ExecutorException(str(e)) 

1505 

1506 return ExecuteAnswer() 

1507 

1508 def answer_update_skill(self, statement, database_name): 

1509 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1510 

1511 type = statement.params.pop("type", None) 

1512 try: 

1513 _ = self.session.skills_controller.update_skill( 

1514 name, project_name=project_name, type=type, params=statement.params 

1515 ) 

1516 except ValueError as e: 

1517 # Project does not exist or skill does not exist. 

1518 raise ExecutorException(str(e)) 

1519 

1520 return ExecuteAnswer() 

1521 

1522 def answer_create_agent(self, statement, database_name): 

1523 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1524 

1525 skills = statement.params.pop("skills", []) 

1526 provider = statement.params.pop("provider", None) 

1527 try: 

1528 _ = self.session.agents_controller.add_agent( 

1529 name=name, 

1530 project_name=project_name, 

1531 model_name=statement.model, 

1532 skills=skills, 

1533 provider=provider, 

1534 params=variables_controller.fill_parameters(statement.params), 

1535 ) 

1536 except EntityExistsError as e: 

1537 if statement.if_not_exists is not True: 

1538 raise ExecutorException(str(e)) 

1539 except ValueError as e: 

1540 # Project does not exist or agent already exists. 

1541 raise ExecutorException(str(e)) 

1542 

1543 return ExecuteAnswer() 

1544 

1545 def answer_drop_agent(self, statement: DropAgent, database_name: str): 

1546 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1547 

1548 try: 

1549 self.session.agents_controller.delete_agent(name, project_name) 

1550 except ValueError as e: 

1551 # Project does not exist or agent does not exist. 

1552 raise ExecutorException(str(e)) 

1553 

1554 return ExecuteAnswer() 

1555 

1556 def answer_update_agent(self, statement: UpdateAgent, database_name: str): 

1557 project_name, name = match_two_part_name(statement.name, default_db_name=database_name) 

1558 

1559 model = statement.params.pop("model", None) 

1560 skills_to_add = statement.params.pop("skills_to_add", []) 

1561 skills_to_remove = statement.params.pop("skills_to_remove", []) 

1562 try: 

1563 _ = self.session.agents_controller.update_agent( 

1564 name, 

1565 project_name=project_name, 

1566 model_name=model, 

1567 skills_to_add=skills_to_add, 

1568 skills_to_remove=skills_to_remove, 

1569 params=variables_controller.fill_parameters(statement.params), 

1570 ) 

1571 except (EntityExistsError, EntityNotExistsError, ValueError) as e: 

1572 # Project does not exist or agent does not exist. 

1573 raise ExecutorException(str(e)) 

1574 

1575 return ExecuteAnswer() 

1576 

1577 @mark_process("learn") 

1578 def answer_create_predictor(self, statement: CreatePredictor, database_name: str): 

1579 integration_name, model_name = match_two_part_name(statement.name, default_db_name=database_name) 

1580 

1581 statement.name.parts = [integration_name, model_name] 

1582 statement.name.is_quoted = [False, False] 

1583 

1584 ml_integration_name = "lightwood" # default 

1585 if statement.using is not None: 

1586 # repack using with lower names 

1587 statement.using = {k.lower(): v for k, v in statement.using.items()} 

1588 

1589 ml_integration_name = statement.using.pop("engine", ml_integration_name) 

1590 

1591 if statement.query_str is not None and statement.integration_name is None: 

1592 # set to current project 

1593 statement.integration_name = Identifier(database_name) 

1594 

1595 try: 

1596 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name) 

1597 except EntityNotExistsError: 

1598 # not exist, try to create it with same name as handler 

1599 self.answer_create_ml_engine( 

1600 CreateMLEngine(name=Identifier(ml_integration_name), handler=ml_integration_name) 

1601 ) 

1602 

1603 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name) 

1604 

1605 if getattr(statement, "is_replace", False) is True: 

1606 # try to delete 

1607 try: 

1608 self.session.model_controller.delete_model(model_name, project_name=integration_name) 

1609 except EntityNotExistsError: 

1610 pass 

1611 

1612 try: 

1613 df = self.session.model_controller.create_model(statement, ml_handler) 

1614 return ExecuteAnswer(data=ResultSet.from_df(df)) 

1615 except EntityExistsError: 

1616 if getattr(statement, "if_not_exists", False) is True: 

1617 return ExecuteAnswer() 

1618 raise 

1619 

1620 def answer_show_columns( 

1621 self, 

1622 target: Identifier, 

1623 where: Optional[Operation] = None, 

1624 like: Optional[str] = None, 

1625 is_full=False, 

1626 database_name=None, 

1627 ): 

1628 if isinstance(target, Identifier) is False: 

1629 raise TableNotExistError("The table name is required for the query.") 

1630 

1631 if len(target.parts) > 1: 

1632 db = target.parts[0] 

1633 elif isinstance(database_name, str) and len(database_name) > 0: 

1634 db = database_name 

1635 else: 

1636 db = self.session.config.get("default_project") 

1637 table_name = target.parts[-1] 

1638 

1639 new_where = BinaryOperation( 

1640 "and", 

1641 args=[ 

1642 BinaryOperation("=", args=[Identifier("TABLE_SCHEMA"), Constant(db)]), 

1643 BinaryOperation("=", args=[Identifier("TABLE_NAME"), Constant(table_name)]), 

1644 ], 

1645 ) 

1646 if where is not None: 

1647 new_where = BinaryOperation("and", args=[new_where, where]) 

1648 if like is not None: 

1649 like = BinaryOperation("like", args=[Identifier("View"), Constant(like)]) 

1650 new_where = BinaryOperation("and", args=[new_where, like]) 

1651 

1652 targets = [ 

1653 Identifier("COLUMN_NAME", alias=Identifier("Field")), 

1654 Identifier("COLUMN_TYPE", alias=Identifier("Type")), 

1655 Identifier("IS_NULLABLE", alias=Identifier("Null")), 

1656 Identifier("COLUMN_KEY", alias=Identifier("Key")), 

1657 Identifier("COLUMN_DEFAULT", alias=Identifier("Default")), 

1658 Identifier("EXTRA", alias=Identifier("Extra")), 

1659 ] 

1660 if is_full: 

1661 targets.extend( 

1662 [ 

1663 Constant(None, alias=Identifier("Collation")), 

1664 Constant("select", alias=Identifier("Privileges")), 

1665 Constant(None, alias=Identifier("Comment")), 

1666 ] 

1667 ) 

1668 new_statement = Select( 

1669 targets=targets, 

1670 from_table=Identifier(parts=["information_schema", "COLUMNS"]), 

1671 where=new_where, 

1672 ) 

1673 

1674 query = SQLQuery(new_statement, session=self.session, database=database_name) 

1675 return self.answer_select(query) 

1676 

1677 def answer_show_create_table(self, table): 

1678 columns = [ 

1679 Column(table_name="", name="Table", type=TYPES.MYSQL_TYPE_VAR_STRING), 

1680 Column(table_name="", name="Create Table", type=TYPES.MYSQL_TYPE_VAR_STRING), 

1681 ] 

1682 return ExecuteAnswer( 

1683 data=ResultSet( 

1684 columns=columns, 

1685 values=[[table, f"create table {table} ()"]], 

1686 ) 

1687 ) 

1688 

1689 def answer_function_status(self): 

1690 columns = [ 

1691 Column( 

1692 name="Db", 

1693 alias="Db", 

1694 table_name="schemata", 

1695 table_alias="ROUTINES", 

1696 type="str", 

1697 database="mysql", 

1698 charset=self.charset_text_type, 

1699 ), 

1700 Column( 

1701 name="Db", 

1702 alias="Db", 

1703 table_name="routines", 

1704 table_alias="ROUTINES", 

1705 type="str", 

1706 database="mysql", 

1707 charset=self.charset_text_type, 

1708 ), 

1709 Column( 

1710 name="Type", 

1711 alias="Type", 

1712 table_name="routines", 

1713 table_alias="ROUTINES", 

1714 type="str", 

1715 database="mysql", 

1716 charset=CHARSET_NUMBERS["utf8_bin"], 

1717 ), 

1718 Column( 

1719 name="Definer", 

1720 alias="Definer", 

1721 table_name="routines", 

1722 table_alias="ROUTINES", 

1723 type="str", 

1724 database="mysql", 

1725 charset=CHARSET_NUMBERS["utf8_bin"], 

1726 ), 

1727 Column( 

1728 name="Modified", 

1729 alias="Modified", 

1730 table_name="routines", 

1731 table_alias="ROUTINES", 

1732 type=TYPES.MYSQL_TYPE_TIMESTAMP, 

1733 database="mysql", 

1734 charset=CHARSET_NUMBERS["binary"], 

1735 ), 

1736 Column( 

1737 name="Created", 

1738 alias="Created", 

1739 table_name="routines", 

1740 table_alias="ROUTINES", 

1741 type=TYPES.MYSQL_TYPE_TIMESTAMP, 

1742 database="mysql", 

1743 charset=CHARSET_NUMBERS["binary"], 

1744 ), 

1745 Column( 

1746 name="Security_type", 

1747 alias="Security_type", 

1748 table_name="routines", 

1749 table_alias="ROUTINES", 

1750 type=TYPES.MYSQL_TYPE_STRING, 

1751 database="mysql", 

1752 charset=CHARSET_NUMBERS["utf8_bin"], 

1753 ), 

1754 Column( 

1755 name="Comment", 

1756 alias="Comment", 

1757 table_name="routines", 

1758 table_alias="ROUTINES", 

1759 type=TYPES.MYSQL_TYPE_BLOB, 

1760 database="mysql", 

1761 charset=CHARSET_NUMBERS["utf8_bin"], 

1762 ), 

1763 Column( 

1764 name="character_set_client", 

1765 alias="character_set_client", 

1766 table_name="character_sets", 

1767 table_alias="ROUTINES", 

1768 type=TYPES.MYSQL_TYPE_VAR_STRING, 

1769 database="mysql", 

1770 charset=self.charset_text_type, 

1771 ), 

1772 Column( 

1773 name="collation_connection", 

1774 alias="collation_connection", 

1775 table_name="collations", 

1776 table_alias="ROUTINES", 

1777 type=TYPES.MYSQL_TYPE_VAR_STRING, 

1778 database="mysql", 

1779 charset=self.charset_text_type, 

1780 ), 

1781 Column( 

1782 name="Database Collation", 

1783 alias="Database Collation", 

1784 table_name="collations", 

1785 table_alias="ROUTINES", 

1786 type=TYPES.MYSQL_TYPE_VAR_STRING, 

1787 database="mysql", 

1788 charset=self.charset_text_type, 

1789 ), 

1790 ] 

1791 

1792 return ExecuteAnswer(data=ResultSet(columns=columns)) 

1793 

1794 def answer_show_table_status(self, table_name): 

1795 # NOTE at this moment parsed statement only like `SHOW TABLE STATUS LIKE 'table'`. 

1796 # NOTE some columns has {'database': 'mysql'}, other not. That correct. This is how real DB sends messages. 

1797 columns = [ 

1798 { 

1799 "database": "mysql", 

1800 "table_name": "tables", 

1801 "name": "Name", 

1802 "alias": "Name", 

1803 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1804 "charset": self.charset_text_type, 

1805 }, 

1806 { 

1807 "database": "", 

1808 "table_name": "tables", 

1809 "name": "Engine", 

1810 "alias": "Engine", 

1811 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1812 "charset": self.charset_text_type, 

1813 }, 

1814 { 

1815 "database": "", 

1816 "table_name": "tables", 

1817 "name": "Version", 

1818 "alias": "Version", 

1819 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1820 "charset": CHARSET_NUMBERS["binary"], 

1821 }, 

1822 { 

1823 "database": "mysql", 

1824 "table_name": "tables", 

1825 "name": "Row_format", 

1826 "alias": "Row_format", 

1827 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1828 "charset": self.charset_text_type, 

1829 }, 

1830 { 

1831 "database": "", 

1832 "table_name": "tables", 

1833 "name": "Rows", 

1834 "alias": "Rows", 

1835 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1836 "charset": CHARSET_NUMBERS["binary"], 

1837 }, 

1838 { 

1839 "database": "", 

1840 "table_name": "tables", 

1841 "name": "Avg_row_length", 

1842 "alias": "Avg_row_length", 

1843 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1844 "charset": CHARSET_NUMBERS["binary"], 

1845 }, 

1846 { 

1847 "database": "", 

1848 "table_name": "tables", 

1849 "name": "Data_length", 

1850 "alias": "Data_length", 

1851 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1852 "charset": CHARSET_NUMBERS["binary"], 

1853 }, 

1854 { 

1855 "database": "", 

1856 "table_name": "tables", 

1857 "name": "Max_data_length", 

1858 "alias": "Max_data_length", 

1859 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1860 "charset": CHARSET_NUMBERS["binary"], 

1861 }, 

1862 { 

1863 "database": "", 

1864 "table_name": "tables", 

1865 "name": "Index_length", 

1866 "alias": "Index_length", 

1867 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1868 "charset": CHARSET_NUMBERS["binary"], 

1869 }, 

1870 { 

1871 "database": "", 

1872 "table_name": "tables", 

1873 "name": "Data_free", 

1874 "alias": "Data_free", 

1875 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1876 "charset": CHARSET_NUMBERS["binary"], 

1877 }, 

1878 { 

1879 "database": "", 

1880 "table_name": "tables", 

1881 "name": "Auto_increment", 

1882 "alias": "Auto_increment", 

1883 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1884 "charset": CHARSET_NUMBERS["binary"], 

1885 }, 

1886 { 

1887 "database": "", 

1888 "table_name": "tables", 

1889 "name": "Create_time", 

1890 "alias": "Create_time", 

1891 "type": TYPES.MYSQL_TYPE_TIMESTAMP, 

1892 "charset": CHARSET_NUMBERS["binary"], 

1893 }, 

1894 { 

1895 "database": "", 

1896 "table_name": "tables", 

1897 "name": "Update_time", 

1898 "alias": "Update_time", 

1899 "type": TYPES.MYSQL_TYPE_TIMESTAMP, 

1900 "charset": CHARSET_NUMBERS["binary"], 

1901 }, 

1902 { 

1903 "database": "", 

1904 "table_name": "tables", 

1905 "name": "Check_time", 

1906 "alias": "Check_time", 

1907 "type": TYPES.MYSQL_TYPE_TIMESTAMP, 

1908 "charset": CHARSET_NUMBERS["binary"], 

1909 }, 

1910 { 

1911 "database": "mysql", 

1912 "table_name": "tables", 

1913 "name": "Collation", 

1914 "alias": "Collation", 

1915 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1916 "charset": self.charset_text_type, 

1917 }, 

1918 { 

1919 "database": "", 

1920 "table_name": "tables", 

1921 "name": "Checksum", 

1922 "alias": "Checksum", 

1923 "type": TYPES.MYSQL_TYPE_LONGLONG, 

1924 "charset": CHARSET_NUMBERS["binary"], 

1925 }, 

1926 { 

1927 "database": "", 

1928 "table_name": "tables", 

1929 "name": "Create_options", 

1930 "alias": "Create_options", 

1931 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1932 "charset": self.charset_text_type, 

1933 }, 

1934 { 

1935 "database": "", 

1936 "table_name": "tables", 

1937 "name": "Comment", 

1938 "alias": "Comment", 

1939 "type": TYPES.MYSQL_TYPE_BLOB, 

1940 "charset": self.charset_text_type, 

1941 }, 

1942 ] 

1943 columns = [Column(**d) for d in columns] 

1944 data = [ 

1945 [ 

1946 table_name, # Name 

1947 "InnoDB", # Engine 

1948 10, # Version 

1949 "Dynamic", # Row_format 

1950 1, # Rows 

1951 16384, # Avg_row_length 

1952 16384, # Data_length 

1953 0, # Max_data_length 

1954 0, # Index_length 

1955 0, # Data_free 

1956 None, # Auto_increment 

1957 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Create_time 

1958 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Update_time 

1959 None, # Check_time 

1960 "utf8mb4_0900_ai_ci", # Collation 

1961 None, # Checksum 

1962 "", # Create_options 

1963 "", # Comment 

1964 ] 

1965 ] 

1966 return ExecuteAnswer(data=ResultSet(columns=columns, values=data)) 

1967 

1968 def answer_show_warnings(self): 

1969 columns = [ 

1970 { 

1971 "database": "", 

1972 "table_name": "", 

1973 "name": "Level", 

1974 "alias": "Level", 

1975 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1976 "charset": self.charset_text_type, 

1977 }, 

1978 { 

1979 "database": "", 

1980 "table_name": "", 

1981 "name": "Code", 

1982 "alias": "Code", 

1983 "type": TYPES.MYSQL_TYPE_LONG, 

1984 "charset": CHARSET_NUMBERS["binary"], 

1985 }, 

1986 { 

1987 "database": "", 

1988 "table_name": "", 

1989 "name": "Message", 

1990 "alias": "Message", 

1991 "type": TYPES.MYSQL_TYPE_VAR_STRING, 

1992 "charset": self.charset_text_type, 

1993 }, 

1994 ] 

1995 columns = [Column(**d) for d in columns] 

1996 return ExecuteAnswer(data=ResultSet(columns=columns)) 

1997 

1998 def answer_create_table(self, statement, database_name): 

1999 SQLQuery(statement, session=self.session, execute=True, database=database_name) 

2000 return ExecuteAnswer() 

2001 

2002 def answer_select(self, query): 

2003 data = query.fetched_data 

2004 return ExecuteAnswer(data=data) 

2005 

2006 def answer_update_model_version(self, model_version, database_name): 

2007 if not isinstance(model_version, Identifier): 

2008 raise ExecutorException(f"Please define version: {model_version}") 

2009 

2010 model_parts = model_version.parts 

2011 version = model_parts[-1] 

2012 if version.isdigit(): 

2013 version = int(version) 

2014 else: 

2015 raise ExecutorException(f"Unknown version: {version}") 

2016 

2017 if len(model_parts) == 3: 

2018 project_name, model_name = model_parts[:2] 

2019 elif len(model_parts) == 2: 

2020 model_name = model_parts[0] 

2021 project_name = database_name 

2022 else: 

2023 raise ExecutorException(f"Unknown model: {model_version}") 

2024 

2025 self.session.model_controller.set_model_active_version(project_name, model_name, version) 

2026 return ExecuteAnswer() 

2027 

2028 def answer_drop_model(self, statement: DropPredictor, database_name: str) -> ExecuteAnswer: 

2029 """Handles the DROP MODEL (or DROP PREDICTOR) command, which removes a model 

2030 or a specific model version from a project. 

2031 

2032 Args: 

2033 statement (DropPredictor): The AST object representing the DROP MODEL or DROP PREDICTOR command. 

2034 database_name (str): The name of the current database/project. 

2035 

2036 Raises: 

2037 EntityNotExistsError: If the model or version does not exist and IF EXISTS is not specified. 

2038 ValueError: If the model name format is invalid. 

2039 

2040 Returns: 

2041 ExecuteAnswer: The result of the model deletion operation. 

2042 """ 

2043 project_name, model_name, version = resolve_model_identifier(statement.name) 

2044 if project_name is None: 

2045 project_name = database_name 

2046 

2047 if version is not None: 

2048 # delete version 

2049 try: 

2050 self.session.model_controller.delete_model_version(project_name, model_name, version) 

2051 except EntityNotExistsError as e: 

2052 if not statement.if_exists: 

2053 raise e 

2054 else: 

2055 # drop model 

2056 try: 

2057 project = self.session.database_controller.get_project(project_name, strict_case=True) 

2058 project.drop_model(model_name) 

2059 except Exception as e: 

2060 if not statement.if_exists: 

2061 raise e 

2062 

2063 return ExecuteAnswer() 

2064 

2065 def change_default_db(self, db_name): 

2066 # That fix for bug in mssql: it keeps connection for a long time, but after some time mssql can 

2067 # send packet with COM_INIT_DB=null. In this case keep old database name as default. 

2068 if db_name != "null": 

2069 if self.session.database_controller.exists(db_name): 

2070 self.session.database = db_name 

2071 else: 

2072 raise BadDbError(f"Database {db_name} does not exists")