Coverage for mindsdb / api / executor / command_executor.py: 19%
933 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime
2from pathlib import Path
3from textwrap import dedent
4from typing import Optional
5from functools import reduce
7import pandas as pd
8from mindsdb_sql_parser import parse_sql
9from mindsdb_sql_parser.ast.mindsdb import AlterDatabase
10from mindsdb_sql_parser.ast import (
11 Alter,
12 ASTNode,
13 BinaryOperation,
14 CommitTransaction,
15 Constant,
16 CreateTable,
17 Delete,
18 Describe,
19 DropDatabase,
20 DropTables,
21 DropView,
22 Explain,
23 Identifier,
24 Insert,
25 NativeQuery,
26 Operation,
27 RollbackTransaction,
28 Select,
29 Set,
30 Show,
31 Star,
32 StartTransaction,
33 Union,
34 Update,
35 Use,
36 Tuple,
37 Function,
38 Variable,
39 Intersect,
40 Except,
41 Parameter,
42 NullConstant,
43)
45# typed models
46from mindsdb_sql_parser.ast.mindsdb import (
47 AlterView,
48 CreateAgent,
49 CreateAnomalyDetectionModel,
50 CreateChatBot,
51 CreateDatabase,
52 CreateJob,
53 CreateKnowledgeBase,
54 AlterKnowledgeBase,
55 CreateMLEngine,
56 CreatePredictor,
57 CreateSkill,
58 CreateTrigger,
59 CreateView,
60 CreateKnowledgeBaseIndex,
61 EvaluateKnowledgeBase,
62 DropAgent,
63 DropChatBot,
64 DropDatasource,
65 DropJob,
66 DropKnowledgeBase,
67 DropMLEngine,
68 DropPredictor,
69 DropSkill,
70 DropTrigger,
71 Evaluate,
72 FinetunePredictor,
73 RetrainPredictor,
74 UpdateAgent,
75 UpdateChatBot,
76 UpdateSkill,
77)
79import mindsdb.utilities.profiler as profiler
81from mindsdb.api.executor.sql_query.result_set import Column, ResultSet
82from mindsdb.api.executor.sql_query import SQLQuery
83from mindsdb.api.executor.data_types.answer import ExecuteAnswer
84from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import (
85 CHARSET_NUMBERS,
86 SERVER_VARIABLES,
87 TYPES,
88)
90from mindsdb.api.executor.exceptions import (
91 ExecutorException,
92 BadDbError,
93 NotSupportedYet,
94 WrongArgumentError,
95 TableNotExistError,
96)
97from mindsdb.api.executor.utilities.functions import download_file
98from mindsdb.api.executor.utilities.sql import query_df
99from mindsdb.integrations.libs.const import (
100 HANDLER_CONNECTION_ARG_TYPE,
101 PREDICTOR_STATUS,
102)
103from mindsdb.integrations.utilities.query_traversal import query_traversal
104from mindsdb.integrations.libs.response import HandlerStatusResponse
105from mindsdb.interfaces.chatbot.chatbot_controller import ChatBotController
106from mindsdb.interfaces.database.projects import ProjectController
107from mindsdb.interfaces.jobs.jobs_controller import JobsController
108from mindsdb.interfaces.model.functions import (
109 get_model_record,
110 get_model_records,
111 get_predictor_integration,
112)
113from mindsdb.interfaces.query_context.context_controller import query_context_controller
114from mindsdb.interfaces.triggers.triggers_controller import TriggersController
115from mindsdb.interfaces.variables.variables_controller import variables_controller
116from mindsdb.utilities.context import context as ctx
117from mindsdb.utilities.functions import mark_process, resolve_model_identifier, get_handler_install_message
118from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError
119from mindsdb.utilities import log
121logger = log.getLogger(__name__)
124def _get_show_where(
125 statement: ASTNode,
126 from_name: Optional[str] = None,
127 like_name: Optional[str] = None,
128 initial: Optional[ASTNode] = None,
129) -> ASTNode:
130 """combine all possible show filters to single 'where' condition
131 SHOW category [FROM name] [LIKE filter] [WHERE filter]
133 Args:
134 statement (ASTNode): 'show' query statement
135 from_name (str): name of column for 'from' filter
136 like_name (str): name of column for 'like' filter,
137 initial (ASTNode): initial 'where' filter
138 Returns:
139 ASTNode: 'where' statemnt
140 """
141 where = []
142 if initial is not None:
143 where.append(initial)
144 if statement.from_table is not None and from_name is not None:
145 where.append(
146 BinaryOperation(
147 "=",
148 args=[Identifier(from_name), Constant(statement.from_table.parts[-1])],
149 )
150 )
151 if statement.like is not None and like_name is not None:
152 where.append(BinaryOperation("like", args=[Identifier(like_name), Constant(statement.like)]))
153 if statement.where is not None:
154 where.append(statement.where)
156 if len(where) > 0:
157 return reduce(lambda prev, next: BinaryOperation("and", args=[prev, next]), where)
158 return None
161def match_one_part_name(identifier: Identifier, ensure_lower_case: bool = False) -> str:
162 """Extract a single-part name from an Identifier object, optionally ensuring it is lowercase.
164 Args:
165 identifier (Identifier): The identifier to extract the name from. Must contain exactly one part.
166 ensure_lower_case (bool, optional): If True, raises ValueError if the name is not lowercase. Defaults to False.
168 Returns:
169 str: The extracted name, converted to lowercase if not quoted.
171 Raises:
172 ValueError: If the identifier does not contain exactly one part, or if ensure_lower_case is True and the name is not lowercase.
173 """
174 match identifier.parts, identifier.is_quoted:
175 case [name], [is_quoted]:
176 ...
177 case _:
178 raise ValueError(f"Only single-part names are allowed: {identifier}")
179 if not is_quoted: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true
180 name = name.lower()
181 if ensure_lower_case and not name.islower(): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 raise ValueError(f"The name must be in lowercase: {identifier}")
183 return name
186def match_two_part_name(
187 identifier: Identifier, ensure_lower_case: bool = False, default_db_name: str | None = None
188) -> tuple[str, str]:
189 """Extract a (database, name) tuple from an Identifier object that may have one or two parts.
191 Args:
192 identifier (Identifier): The identifier to extract names from. Must contain one or two parts.
193 ensure_lower_case (bool, optional): If True, raises ValueError if the name part is not lowercase. Defaults to False.
194 default_db_name (str | None, optional): The default database name to use if only one part is provided. Defaults to None.
196 Returns:
197 tuple[str, str]: A tuple of (database_name, name), where database_name may be None if not provided and no default is given.
199 Raises:
200 ValueError: If the identifier does not contain one or two parts, or if ensure_lower_case is True and the name is not lowercase.
201 """
202 db_name = None
204 match identifier.parts, identifier.is_quoted:
205 case [name], [is_quoted]:
206 ...
207 case [db_name, name], [db_is_quoted, is_quoted]:
208 if not db_is_quoted:
209 db_name = db_name.lower()
210 case _:
211 raise ValueError(f"Only single-part or two-part names are allowed: {identifier}")
212 if not is_quoted:
213 name = name.lower()
214 if ensure_lower_case and not name.islower():
215 raise ValueError(f"The name must be in lowercase: {identifier}")
216 if db_name is None:
217 db_name = default_db_name
218 return db_name, name
221def apply_parameters(statement, params):
222 def fill_parameters(node, **kwargs):
223 if isinstance(node, Parameter):
224 if node.value in params: 224 ↛ exitline 224 didn't return from function 'fill_parameters' because the condition on line 224 was always true
225 value = params[node.value]
226 if value is None:
227 return NullConstant()
228 if isinstance(value, list):
229 return Tuple([Constant(i) for i in value])
230 return Constant(value)
232 query_traversal(statement, fill_parameters)
235class ExecuteCommands:
236 def __init__(self, session, context=None):
237 if context is None:
238 context = {}
240 self.context = context
241 self.session = session
243 self.charset_text_type = CHARSET_NUMBERS["utf8_general_ci"]
244 self.datahub = session.datahub
246 @profiler.profile()
247 def execute_command(self, statement: ASTNode, database_name: str = None) -> ExecuteAnswer:
248 sql: str = statement.to_string()
249 sql_lower: str = sql.lower()
251 if database_name is None: 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was always true
252 database_name = self.session.database
254 if ctx.params:
255 apply_parameters(statement, ctx.params)
257 statement_type = type(statement)
258 if statement_type is CreateDatabase:
259 return self.answer_create_database(statement)
260 elif statement_type is CreateMLEngine:
261 return self.answer_create_ml_engine(statement)
262 elif statement_type is DropMLEngine: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 return self.answer_drop_ml_engine(statement)
264 elif statement_type is DropPredictor: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 return self.answer_drop_model(statement, database_name)
266 elif statement_type is DropTables: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 return self.answer_drop_tables(statement, database_name)
268 elif statement_type is DropDatasource or statement_type is DropDatabase: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 return self.answer_drop_database(statement)
270 elif statement_type is AlterDatabase: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 return self.answer_alter_database(statement)
272 elif statement_type is Describe: 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was never true
273 # NOTE in sql 'describe table' is same as 'show columns'
274 obj_type = statement.type
276 if obj_type is None or obj_type.upper() in ("MODEL", "PREDICTOR"):
277 return self.answer_describe_predictor(statement.value, database_name)
278 else:
279 return self.answer_describe_object(obj_type.upper(), statement.value, database_name)
281 elif statement_type is RetrainPredictor: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 return self.answer_retrain_predictor(statement, database_name)
283 elif statement_type is FinetunePredictor: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 return self.answer_finetune_predictor(statement, database_name)
285 elif statement_type is Show: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 sql_category = statement.category.lower()
287 if hasattr(statement, "modes"):
288 if isinstance(statement.modes, list) is False:
289 statement.modes = []
290 statement.modes = [x.upper() for x in statement.modes]
291 if sql_category == "ml_engines":
292 new_statement = Select(
293 targets=[Star()],
294 from_table=Identifier(parts=["information_schema", "ml_engines"]),
295 where=_get_show_where(statement, like_name="name"),
296 )
298 query = SQLQuery(new_statement, session=self.session, database=database_name)
299 return self.answer_select(query)
300 elif sql_category == "handlers":
301 new_statement = Select(
302 targets=[Star()],
303 from_table=Identifier(parts=["information_schema", "handlers"]),
304 where=_get_show_where(statement, like_name="name"),
305 )
307 query = SQLQuery(new_statement, session=self.session, database=database_name)
308 return self.answer_select(query)
309 elif sql_category == "plugins":
310 if statement.where is not None or statement.like:
311 raise ExecutorException("'SHOW PLUGINS' query should be used without filters")
312 new_statement = Select(
313 targets=[Star()],
314 from_table=Identifier(parts=["information_schema", "PLUGINS"]),
315 )
316 query = SQLQuery(new_statement, session=self.session, database=database_name)
317 return self.answer_select(query)
318 elif sql_category in ("databases", "schemas"):
319 new_statement = Select(
320 targets=[Identifier(parts=["NAME"], alias=Identifier("Database"))],
321 from_table=Identifier(parts=["information_schema", "DATABASES"]),
322 where=_get_show_where(statement, like_name="Database"),
323 )
325 if "FULL" in statement.modes:
326 new_statement.targets.extend(
327 [
328 Identifier(parts=["TYPE"], alias=Identifier("TYPE")),
329 Identifier(parts=["ENGINE"], alias=Identifier("ENGINE")),
330 ]
331 )
333 query = SQLQuery(new_statement, session=self.session, database=database_name)
334 return self.answer_select(query)
335 elif sql_category in ("tables", "full tables"):
336 schema = database_name or "mindsdb"
337 if statement.from_table is not None and statement.in_table is not None:
338 raise ExecutorException(
339 "You have an error in your SQL syntax: 'from' and 'in' cannot be used together"
340 )
342 if statement.from_table is not None:
343 schema = statement.from_table.parts[-1]
344 statement.from_table = None
345 if statement.in_table is not None:
346 schema = statement.in_table.parts[-1]
347 statement.in_table = None
349 table_types = [Constant(t) for t in ["MODEL", "BASE TABLE", "SYSTEM VIEW", "VIEW"]]
350 where = BinaryOperation(
351 "and",
352 args=[
353 BinaryOperation("=", args=[Identifier("table_schema"), Constant(schema)]),
354 BinaryOperation("in", args=[Identifier("table_type"), Tuple(table_types)]),
355 ],
356 )
358 new_statement = Select(
359 targets=[
360 Identifier(
361 parts=["table_name"],
362 alias=Identifier(f"Tables_in_{schema}"),
363 )
364 ],
365 from_table=Identifier(parts=["information_schema", "TABLES"]),
366 where=_get_show_where(statement, like_name=f"Tables_in_{schema}", initial=where),
367 )
369 if "FULL" in statement.modes:
370 new_statement.targets.append(Identifier(parts=["TABLE_TYPE"], alias=Identifier("Table_type")))
372 query = SQLQuery(new_statement, session=self.session, database=database_name)
373 return self.answer_select(query)
374 elif sql_category in (
375 "variables",
376 "session variables",
377 "session status",
378 "global variables",
379 ):
380 new_statement = Select(
381 targets=[
382 Identifier(parts=["Variable_name"]),
383 Identifier(parts=["Value"]),
384 ],
385 from_table=Identifier(parts=["dataframe"]),
386 where=_get_show_where(statement, like_name="Variable_name"),
387 )
389 data = {}
390 is_session = "session" in sql_category
391 for var_name, var_data in SERVER_VARIABLES.items():
392 var_name = var_name.replace("@@", "")
393 if is_session and var_name.startswith("session.") is False:
394 continue
395 if var_name.startswith("session.") or var_name.startswith("GLOBAL."):
396 name = var_name.replace("session.", "").replace("GLOBAL.", "")
397 data[name] = var_data[0]
398 elif var_name not in data:
399 data[var_name] = var_data[0]
401 df = pd.DataFrame(data.items(), columns=["Variable_name", "Value"])
402 df2 = query_df(df, new_statement)
404 return ExecuteAnswer(data=ResultSet.from_df(df2, table_name="session_variables"))
405 elif sql_category == "search_path":
406 return ExecuteAnswer(
407 data=ResultSet(
408 columns=[Column(name="search_path", table_name="search_path", type="str")],
409 values=[['"$user", public']],
410 )
411 )
412 elif "show status like 'ssl_version'" in sql_lower:
413 return ExecuteAnswer(
414 data=ResultSet(
415 columns=[
416 Column(name="Value", table_name="session_variables", type="str"),
417 Column(name="Value", table_name="session_variables", type="str"),
418 ],
419 values=[["Ssl_version", "TLSv1.1"]],
420 )
421 )
422 elif sql_category in ("function status", "procedure status"):
423 # SHOW FUNCTION STATUS WHERE Db = 'MINDSDB';
424 # SHOW PROCEDURE STATUS WHERE Db = 'MINDSDB'
425 # SHOW FUNCTION STATUS WHERE Db = 'MINDSDB' AND Name LIKE '%';
426 return self.answer_function_status()
427 elif sql_category in ("index", "keys", "indexes"):
428 # INDEX | INDEXES | KEYS are synonyms
429 # https://dev.mysql.com/doc/refman/8.0/en/show-index.html
430 new_statement = Select(
431 targets=[
432 Identifier("TABLE_NAME", alias=Identifier("Table")),
433 Identifier("NON_UNIQUE", alias=Identifier("Non_unique")),
434 Identifier("INDEX_NAME", alias=Identifier("Key_name")),
435 Identifier("SEQ_IN_INDEX", alias=Identifier("Seq_in_index")),
436 Identifier("COLUMN_NAME", alias=Identifier("Column_name")),
437 Identifier("COLLATION", alias=Identifier("Collation")),
438 Identifier("CARDINALITY", alias=Identifier("Cardinality")),
439 Identifier("SUB_PART", alias=Identifier("Sub_part")),
440 Identifier("PACKED", alias=Identifier("Packed")),
441 Identifier("NULLABLE", alias=Identifier("Null")),
442 Identifier("INDEX_TYPE", alias=Identifier("Index_type")),
443 Identifier("COMMENT", alias=Identifier("Comment")),
444 Identifier("INDEX_COMMENT", alias=Identifier("Index_comment")),
445 Identifier("IS_VISIBLE", alias=Identifier("Visible")),
446 Identifier("EXPRESSION", alias=Identifier("Expression")),
447 ],
448 from_table=Identifier(parts=["information_schema", "STATISTICS"]),
449 where=statement.where,
450 )
451 query = SQLQuery(new_statement, session=self.session, database=database_name)
452 return self.answer_select(query)
453 # FIXME if have answer on that request, then DataGrip show warning '[S0022] Column 'Non_unique' not found.'
454 elif "show create table" in sql_lower:
455 # SHOW CREATE TABLE `MINDSDB`.`predictors`
456 table = sql[sql.rfind(".") + 1 :].strip(" .;\n\t").replace("`", "")
457 return self.answer_show_create_table(table)
458 elif sql_category in ("character set", "charset"):
459 new_statement = Select(
460 targets=[
461 Identifier("CHARACTER_SET_NAME", alias=Identifier("Charset")),
462 Identifier("DEFAULT_COLLATE_NAME", alias=Identifier("Description")),
463 Identifier("DESCRIPTION", alias=Identifier("Default collation")),
464 Identifier("MAXLEN", alias=Identifier("Maxlen")),
465 ],
466 from_table=Identifier(parts=["INFORMATION_SCHEMA", "CHARACTER_SETS"]),
467 where=_get_show_where(statement, like_name="CHARACTER_SET_NAME"),
468 )
469 query = SQLQuery(new_statement, session=self.session, database=database_name)
470 return self.answer_select(query)
471 elif sql_category == "warnings":
472 return self.answer_show_warnings()
473 elif sql_category == "engines":
474 new_statement = Select(
475 targets=[Star()],
476 from_table=Identifier(parts=["information_schema", "ENGINES"]),
477 )
478 query = SQLQuery(new_statement, session=self.session, database=database_name)
479 return self.answer_select(query)
480 elif sql_category == "collation":
481 new_statement = Select(
482 targets=[
483 Identifier("COLLATION_NAME", alias=Identifier("Collation")),
484 Identifier("CHARACTER_SET_NAME", alias=Identifier("Charset")),
485 Identifier("ID", alias=Identifier("Id")),
486 Identifier("IS_DEFAULT", alias=Identifier("Default")),
487 Identifier("IS_COMPILED", alias=Identifier("Compiled")),
488 Identifier("SORTLEN", alias=Identifier("Sortlen")),
489 Identifier("PAD_ATTRIBUTE", alias=Identifier("Pad_attribute")),
490 ],
491 from_table=Identifier(parts=["INFORMATION_SCHEMA", "COLLATIONS"]),
492 where=_get_show_where(statement, like_name="Collation"),
493 )
494 query = SQLQuery(new_statement, session=self.session, database=database_name)
495 return self.answer_select(query)
496 elif sql_category == "table status":
497 # TODO improve it
498 # SHOW TABLE STATUS LIKE 'table'
499 table_name = None
500 if statement.like is not None:
501 table_name = statement.like
502 # elif condition == 'from' and type(expression) == Identifier:
503 # table_name = expression.parts[-1]
504 if table_name is None:
505 err_str = f"Can't determine table name in query: {sql}"
506 logger.warning(err_str)
507 raise TableNotExistError(err_str)
508 return self.answer_show_table_status(table_name)
509 elif sql_category == "columns":
510 is_full = statement.modes is not None and "FULL" in statement.modes
511 return self.answer_show_columns(
512 statement.from_table,
513 statement.where,
514 statement.like,
515 is_full=is_full,
516 database_name=database_name,
517 )
519 elif sql_category in (
520 "agents",
521 "jobs",
522 "skills",
523 "chatbots",
524 "triggers",
525 "views",
526 "knowledge_bases",
527 "knowledge bases",
528 "predictors",
529 "models",
530 ):
531 if sql_category == "knowledge bases":
532 sql_category = "knowledge_bases"
534 if sql_category == "predictors":
535 sql_category = "models"
537 db_name = database_name
538 if statement.from_table is not None:
539 db_name = statement.from_table.parts[-1]
541 where = BinaryOperation(op="=", args=[Identifier("project"), Constant(db_name)])
543 select_statement = Select(
544 targets=[Star()],
545 from_table=Identifier(parts=["information_schema", sql_category]),
546 where=_get_show_where(statement, like_name="name", initial=where),
547 )
548 query = SQLQuery(select_statement, session=self.session)
549 return self.answer_select(query)
551 elif sql_category == "projects":
552 where = BinaryOperation(op="=", args=[Identifier("type"), Constant("project")])
553 select_statement = Select(
554 targets=[Identifier(parts=["NAME"], alias=Identifier("project"))],
555 from_table=Identifier(parts=["information_schema", "DATABASES"]),
556 where=_get_show_where(statement, like_name="project", from_name="project", initial=where),
557 )
559 query = SQLQuery(select_statement, session=self.session)
560 return self.answer_select(query)
561 else:
562 raise NotSupportedYet(f"Statement not implemented: {sql}")
563 elif statement_type in ( 563 ↛ 568line 563 didn't jump to line 568 because the condition on line 563 was never true
564 StartTransaction,
565 CommitTransaction,
566 RollbackTransaction,
567 ):
568 return ExecuteAnswer()
569 elif statement_type is Set: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true
570 category = (statement.category or "").lower()
571 if category == "":
572 if isinstance(statement.name, Identifier):
573 param = statement.name.parts[0].lower()
575 value = None
576 if isinstance(statement.value, Constant):
577 value = statement.value.value
579 if param == "profiling":
580 self.session.profiling = value in (1, True)
581 if self.session.profiling is True:
582 profiler.enable()
583 else:
584 profiler.disable()
585 elif param == "predictor_cache":
586 self.session.predictor_cache = value in (1, True)
587 elif param == "context":
588 if value in (0, False, None):
589 # drop context
590 query_context_controller.drop_query_context(None)
591 elif param == "show_secrets":
592 self.session.show_secrets = value in (1, True)
593 elif isinstance(statement.name, Variable):
594 variables_controller.set_variable(statement.name.value, statement.value)
595 return ExecuteAnswer()
596 elif category == "autocommit":
597 return ExecuteAnswer()
598 elif category == "names":
599 # set names utf8;
600 charsets = {
601 "utf8": CHARSET_NUMBERS["utf8_general_ci"],
602 "utf8mb4": CHARSET_NUMBERS["utf8mb4_general_ci"],
603 }
604 self.charset = statement.value.value
605 self.charset_text_type = charsets.get(self.charset)
606 if self.charset_text_type is None:
607 logger.warning(
608 f"Unknown charset: {self.charset}. Setting up 'utf8_general_ci' as charset text type."
609 )
610 self.charset_text_type = CHARSET_NUMBERS["utf8_general_ci"]
611 return ExecuteAnswer(
612 state_track=[
613 ["character_set_client", self.charset],
614 ["character_set_connection", self.charset],
615 ["character_set_results", self.charset],
616 ],
617 )
618 elif category == "active":
619 return self.answer_update_model_version(statement.value, database_name)
621 else:
622 logger.warning(f"SQL statement is not processable, return OK package: {sql}")
623 return ExecuteAnswer()
624 elif statement_type is Use: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 db_name = statement.value.parts[-1]
626 self.change_default_db(db_name)
627 return ExecuteAnswer()
628 elif statement_type in ( 628 ↛ 632line 628 didn't jump to line 632 because the condition on line 628 was never true
629 CreatePredictor,
630 CreateAnomalyDetectionModel, # we may want to specialize these in the future
631 ):
632 return self.answer_create_predictor(statement, database_name)
633 elif statement_type is CreateView: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true
634 return self.answer_create_or_alter_view(statement, database_name)
635 elif statement_type is AlterView: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 return self.answer_create_or_alter_view(statement, database_name)
637 elif statement_type is DropView: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true
638 return self.answer_drop_view(statement, database_name)
639 elif statement_type is Delete: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 query = SQLQuery(statement, session=self.session, database=database_name)
641 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows)
642 elif statement_type is Insert: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 query = SQLQuery(statement, session=self.session, database=database_name)
644 if query.fetched_data.length() > 0:
645 return self.answer_select(query)
646 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows)
647 elif statement_type is Update: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 query = SQLQuery(statement, session=self.session, database=database_name)
649 return ExecuteAnswer(affected_rows=query.fetched_data.affected_rows)
650 elif statement_type is Alter and ("disable keys" in sql_lower) or ("enable keys" in sql_lower): 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 return ExecuteAnswer()
652 elif statement_type is Select:
653 ret = self.exec_service_function(statement, database_name)
654 if ret is not None: 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true
655 return ret
656 query = SQLQuery(statement, session=self.session, database=database_name)
657 return self.answer_select(query)
658 elif statement_type is Explain: 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true
659 return self.answer_show_columns(statement.target, database_name=database_name)
660 elif statement_type is CreateTable: 660 ↛ 663line 660 didn't jump to line 663 because the condition on line 660 was always true
661 return self.answer_create_table(statement, database_name)
662 # -- jobs --
663 elif statement_type is CreateJob:
664 return self.answer_create_job(statement, database_name)
665 elif statement_type is DropJob:
666 return self.answer_drop_job(statement, database_name)
667 # -- triggers --
668 elif statement_type is CreateTrigger:
669 return self.answer_create_trigger(statement, database_name)
670 elif statement_type is DropTrigger:
671 return self.answer_drop_trigger(statement, database_name)
672 # -- chatbots
673 elif statement_type is CreateChatBot:
674 return self.answer_create_chatbot(statement, database_name)
675 elif statement_type is UpdateChatBot:
676 return self.answer_update_chatbot(statement, database_name)
677 elif statement_type is DropChatBot:
678 return self.answer_drop_chatbot(statement, database_name)
679 elif statement_type is CreateKnowledgeBase:
680 return self.answer_create_kb(statement, database_name)
681 elif statement_type is AlterKnowledgeBase:
682 return self.answer_alter_kb(statement, database_name)
683 elif statement_type is DropKnowledgeBase:
684 return self.answer_drop_kb(statement, database_name)
685 elif statement_type is CreateSkill:
686 return self.answer_create_skill(statement, database_name)
687 elif statement_type is DropSkill:
688 return self.answer_drop_skill(statement, database_name)
689 elif statement_type is UpdateSkill:
690 return self.answer_update_skill(statement, database_name)
691 elif statement_type is CreateAgent:
692 return self.answer_create_agent(statement, database_name)
693 elif statement_type is DropAgent:
694 return self.answer_drop_agent(statement, database_name)
695 elif statement_type is UpdateAgent:
696 return self.answer_update_agent(statement, database_name)
697 elif statement_type is Evaluate:
698 statement.data = parse_sql(statement.query_str)
699 return self.answer_evaluate_metric(statement, database_name)
700 elif statement_type is CreateKnowledgeBaseIndex:
701 return self.answer_create_kb_index(statement, database_name)
702 elif statement_type is EvaluateKnowledgeBase:
703 return self.answer_evaluate_kb(statement, database_name)
704 elif statement_type in (Union, Intersect, Except):
705 query = SQLQuery(statement, session=self.session, database=database_name)
706 return self.answer_select(query)
707 else:
708 logger.warning(f"Unknown SQL statement: {sql}")
709 raise NotSupportedYet(f"Unknown SQL statement: {sql}")
711 def exec_service_function(self, statement: Select, database_name: str) -> Optional[ExecuteAnswer]:
712 """
713 If input query is a single line select without FROM
714 and has function in targets that matches with one of the mindsdb service functions:
715 - execute this function and return response
716 Otherwise, return None to allow to continue execution query outside
717 """
719 if statement.from_table is not None or len(statement.targets) != 1: 719 ↛ 722line 719 didn't jump to line 722 because the condition on line 719 was always true
720 return
722 target = statement.targets[0]
723 if not isinstance(target, Function):
724 return
726 command = target.op.lower()
727 args = [arg.value for arg in target.args if isinstance(arg, Constant)]
728 if command == "query_resume":
729 ret = SQLQuery(None, session=self.session, query_id=args[0])
730 return self.answer_select(ret)
732 elif command == "query_cancel":
733 query_context_controller.cancel_query(*args)
734 return ExecuteAnswer()
736 def answer_create_trigger(self, statement, database_name):
737 triggers_controller = TriggersController()
738 project_name, trigger_name = match_two_part_name(statement.name, default_db_name=database_name)
740 triggers_controller.add(
741 trigger_name,
742 project_name,
743 statement.table,
744 statement.query_str,
745 statement.columns,
746 )
747 return ExecuteAnswer()
749 def answer_drop_trigger(self, statement, database_name):
750 triggers_controller = TriggersController()
752 project_name, trigger_name = match_two_part_name(statement.name, default_db_name=database_name)
754 triggers_controller.delete(trigger_name, project_name)
756 return ExecuteAnswer()
758 def answer_create_job(self, statement: CreateJob, database_name):
759 jobs_controller = JobsController()
760 project_name, job_name = match_two_part_name(statement.name, default_db_name=database_name)
762 try:
763 jobs_controller.create(job_name, project_name, statement)
764 except EntityExistsError:
765 if getattr(statement, "if_not_exists", False) is False:
766 raise
768 return ExecuteAnswer()
770 def answer_drop_job(self, statement, database_name):
771 jobs_controller = JobsController()
772 project_name, job_name = match_two_part_name(statement.name, default_db_name=database_name)
774 try:
775 jobs_controller.delete(job_name, project_name)
776 except EntityNotExistsError:
777 if statement.if_exists is False:
778 raise
780 return ExecuteAnswer()
782 def answer_create_chatbot(self, statement, database_name):
783 chatbot_controller = ChatBotController()
784 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
786 is_running = statement.params.pop("is_running", True)
788 database = self.session.integration_controller.get(statement.database.parts[-1])
789 if database is None:
790 raise ExecutorException(f"Database not found: {statement.database}")
792 # Database ID cannot be null
793 database_id = database["id"] if database is not None else -1
795 model_name = None
796 if statement.model is not None:
797 model_name = statement.model.parts[-1]
799 agent_name = None
800 if statement.agent is not None:
801 agent_name = statement.agent.parts[-1]
802 chatbot_controller.add_chatbot(
803 name,
804 project_name=project_name,
805 model_name=model_name,
806 agent_name=agent_name,
807 database_id=database_id,
808 is_running=is_running,
809 params=variables_controller.fill_parameters(statement.params),
810 )
811 return ExecuteAnswer()
813 def answer_update_chatbot(self, statement, database_name):
814 chatbot_controller = ChatBotController()
816 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
818 # From SET keyword parameters
819 updated_name = statement.params.pop("name", None)
820 model_name = statement.params.pop("model", None)
821 agent_name = statement.params.pop("agent", None)
822 database_name = statement.params.pop("database", None)
823 is_running = statement.params.pop("is_running", None)
825 database_id = None
826 if database_name is not None:
827 database = self.session.integration_controller.get(database_name)
828 if database is None:
829 raise ExecutorException(f"Database with name {database_name} not found")
830 database_id = database["id"]
832 updated_chatbot = chatbot_controller.update_chatbot(
833 name,
834 project_name=project_name,
835 name=updated_name,
836 model_name=model_name,
837 agent_name=agent_name,
838 database_id=database_id,
839 is_running=is_running,
840 params=variables_controller.fill_parameters(statement.params),
841 )
842 if updated_chatbot is None:
843 raise ExecutorException(f"Chatbot with name {name} not found")
844 return ExecuteAnswer()
846 def answer_drop_chatbot(self, statement, database_name):
847 chatbot_controller = ChatBotController()
849 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
851 chatbot_controller.delete_chatbot(name, project_name=project_name)
852 return ExecuteAnswer()
854 def answer_evaluate_metric(self, statement, database_name):
855 # heavy import, so we do it here on-demand
856 try:
857 from mindsdb_evaluator.accuracy.general import evaluate_accuracy
858 except ImportError:
859 logger.error("mindsdb-evaluator is not installed. Please install it with `pip install mindsdb-evaluator]`.")
861 try:
862 sqlquery = SQLQuery(statement.data, session=self.session, database=database_name)
863 except Exception as e:
864 raise Exception(f'Nested query failed to execute with error: "{e}", please check and try again.') from e
865 df = sqlquery.fetched_data.to_df()
866 df.columns = [str(t.alias) if hasattr(t, "alias") else str(t.parts[-1]) for t in statement.data.targets]
868 for col in ["actual", "prediction"]:
869 assert col in df.columns, f"`{col}` column was not provided, please try again."
870 assert df[col].isna().sum() == 0, f"There are missing values in the `{col}` column, please try again."
872 metric_name = statement.name.parts[-1]
873 target_series = df.pop("prediction")
874 using_clause = statement.using if statement.using is not None else {}
875 metric_value = evaluate_accuracy(
876 df,
877 target_series,
878 metric_name,
879 target="actual",
880 ts_analysis=using_clause.get("ts_analysis", {}), # will be deprecated soon
881 n_decimals=using_clause.get("n_decimals", 3),
882 ) # 3 decimals by default
883 return ExecuteAnswer(
884 data=ResultSet(
885 columns=[Column(name=metric_name, table_name="", type="str")],
886 values=[[metric_value]],
887 )
888 )
890 def answer_describe_object(self, obj_type: str, obj_name: Identifier, database_name: str):
891 project_objects = (
892 "AGENTS",
893 "JOBS",
894 "SKILLS",
895 "CHATBOTS",
896 "TRIGGERS",
897 "VIEWS",
898 "KNOWLEDGE_BASES",
899 "PREDICTORS",
900 "MODELS",
901 )
903 global_objects = ("DATABASES", "PROJECTS", "HANDLERS", "ML_ENGINES")
905 all_objects = project_objects + global_objects
907 # is not plural?
908 if obj_type not in all_objects:
909 if obj_type + "S" in all_objects:
910 obj_type = obj_type + "S"
911 elif obj_type + "ES" in all_objects:
912 obj_type = obj_type + "ES"
913 else:
914 raise WrongArgumentError(f"Unknown describe type: {obj_type}")
916 parts = obj_name.parts
917 if len(parts) > 2:
918 raise WrongArgumentError(
919 f"Invalid object name: {obj_name.to_string()}.\nOnly models support three-part namespaces."
920 )
922 name = parts[-1]
923 where = BinaryOperation(op="=", args=[Identifier("name"), Constant(name)])
925 if obj_type in project_objects:
926 database_name = parts[0] if len(parts) > 1 else database_name
927 where = BinaryOperation(
928 op="and", args=[where, BinaryOperation(op="=", args=[Identifier("project"), Constant(database_name)])]
929 )
931 select_statement = Select(
932 targets=[Star()],
933 from_table=Identifier(parts=["information_schema", obj_type]),
934 where=where,
935 )
936 query = SQLQuery(select_statement, session=self.session)
937 return self.answer_select(query)
939 def answer_describe_predictor(self, obj_name, database_name):
940 value = obj_name.parts.copy()
941 # project.model.version.?attrs
942 parts = value[:3]
943 attrs = value[3:]
944 model_info = self._get_model_info(Identifier(parts=parts), except_absent=False, database_name=database_name)
945 if model_info is None:
946 # project.model.?attrs
947 parts = value[:2]
948 attrs = value[2:]
949 model_info = self._get_model_info(Identifier(parts=parts), except_absent=False, database_name=database_name)
950 if model_info is None:
951 # model.?attrs
952 parts = value[:1]
953 attrs = value[1:]
954 model_info = self._get_model_info(
955 Identifier(parts=parts), except_absent=False, database_name=database_name
956 )
958 if model_info is None:
959 raise ExecutorException(f"Model not found: {obj_name}")
961 if len(attrs) == 1:
962 attrs = attrs[0]
963 elif len(attrs) == 0:
964 attrs = None
966 df = self.session.model_controller.describe_model(
967 self.session,
968 model_info["project_name"],
969 model_info["model_record"].name,
970 attribute=attrs,
971 version=model_info["model_record"].version,
972 )
974 return ExecuteAnswer(data=ResultSet.from_df(df, table_name=""))
976 def answer_create_kb_index(self, statement, database_name):
977 project_name, table_name = match_two_part_name(statement.name, default_db_name=database_name)
978 self.session.kb_controller.create_index(table_name=table_name, project_name=project_name)
979 return ExecuteAnswer()
981 def answer_evaluate_kb(self, statement: EvaluateKnowledgeBase, database_name):
982 project_name, table_name = match_two_part_name(statement.name, default_db_name=database_name)
983 scores = self.session.kb_controller.evaluate(
984 table_name=table_name, project_name=project_name, params=statement.params
985 )
986 return ExecuteAnswer(data=ResultSet.from_df(scores))
988 def _get_model_info(self, identifier, except_absent=True, database_name=None):
989 if len(identifier.parts) == 1:
990 identifier.parts = [database_name, identifier.parts[0]]
991 identifier.is_quoted = [False] + identifier.is_quoted
993 database_name, model_name, model_version = resolve_model_identifier(identifier)
994 # at least two part in identifier
995 identifier.parts[0] = database_name
996 identifier.parts[1] = model_name
998 if database_name is None:
999 database_name = database_name
1001 if model_name is None:
1002 if except_absent:
1003 raise Exception(f"Model not found: {identifier.to_string()}")
1004 else:
1005 return
1007 model_record = get_model_record(
1008 name=model_name,
1009 project_name=database_name,
1010 except_absent=except_absent,
1011 version=model_version,
1012 active=True if model_version is None else None,
1013 )
1014 if not model_record:
1015 return None
1016 return {"model_record": model_record, "project_name": database_name}
1018 def _sync_predictor_check(self, phase_name):
1019 """Checks if there is already a predictor retraining or fine-tuning
1020 Do not allow to run retrain if there is another model in training process in less that 1h
1021 """
1022 if ctx.company_id is None:
1023 # bypass for tests
1024 return
1025 is_cloud = self.session.config.get("cloud", False)
1026 if is_cloud and ctx.user_class == 0:
1027 models = get_model_records(active=None)
1028 shortest_training = None
1029 for model in models:
1030 if (
1031 model.status in (PREDICTOR_STATUS.GENERATING, PREDICTOR_STATUS.TRAINING)
1032 and model.training_start_at is not None
1033 and model.training_stop_at is None
1034 ):
1035 training_time = datetime.datetime.now() - model.training_start_at
1036 if shortest_training is None or training_time < shortest_training:
1037 shortest_training = training_time
1039 if shortest_training is not None and shortest_training < datetime.timedelta(hours=1):
1040 raise ExecutorException(
1041 f"Can't start {phase_name} process while any other predictor is in status 'training' or 'generating'"
1042 )
1044 def answer_retrain_predictor(self, statement, database_name):
1045 model_record = self._get_model_info(statement.name, database_name=database_name)["model_record"]
1047 if statement.query_str is None:
1048 if model_record.data_integration_ref is not None:
1049 if model_record.data_integration_ref["type"] == "integration":
1050 integration = self.session.integration_controller.get_by_id(model_record.data_integration_ref["id"])
1051 if integration is None:
1052 raise EntityNotExistsError("The database from which the model was trained no longer exists")
1053 elif statement.integration_name is None:
1054 # set to current project
1055 statement.integration_name = Identifier(database_name)
1057 ml_handler = None
1058 if statement.using is not None:
1059 # repack using with lower names
1060 statement.using = {k.lower(): v for k, v in statement.using.items()}
1062 if "engine" in statement.using:
1063 ml_integration_name = statement.using.pop("engine")
1064 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name)
1066 # use current ml handler
1067 if ml_handler is None:
1068 integration_record = get_predictor_integration(model_record)
1069 if integration_record is None:
1070 raise EntityNotExistsError("ML engine model was trained with does not esxists")
1071 ml_handler = self.session.integration_controller.get_ml_handler(integration_record.name)
1073 self._sync_predictor_check(phase_name="retrain")
1074 df = self.session.model_controller.retrain_model(statement, ml_handler)
1076 return ExecuteAnswer(data=ResultSet.from_df(df))
1078 @profiler.profile()
1079 @mark_process("learn")
1080 def answer_finetune_predictor(self, statement, database_name):
1081 model_record = self._get_model_info(statement.name, database_name=database_name)["model_record"]
1083 if statement.using is not None:
1084 # repack using with lower names
1085 statement.using = {k.lower(): v for k, v in statement.using.items()}
1087 if statement.query_str is not None and statement.integration_name is None:
1088 # set to current project
1089 statement.integration_name = Identifier(database_name)
1091 # use current ml handler
1092 integration_record = get_predictor_integration(model_record)
1093 if integration_record is None:
1094 raise Exception("The ML engine that the model was trained with does not exist.")
1095 ml_handler = self.session.integration_controller.get_ml_handler(integration_record.name)
1097 self._sync_predictor_check(phase_name="finetune")
1098 df = self.session.model_controller.finetune_model(statement, ml_handler)
1100 return ExecuteAnswer(data=ResultSet.from_df(df))
1102 def _create_integration(self, name: str, engine: str, connection_args: dict):
1103 # we have connection checkers not for any db. So do nothing if fail
1104 # TODO return rich error message
1106 if connection_args is None: 1106 ↛ 1107line 1106 didn't jump to line 1107 because the condition on line 1106 was never true
1107 connection_args = {}
1108 status = HandlerStatusResponse(success=False)
1110 storage = None
1111 try:
1112 handler_meta = self.session.integration_controller.get_handler_meta(engine)
1113 if handler_meta is None: 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 raise ExecutorException(f"There is no engine '{engine}'")
1116 if handler_meta.get("import", {}).get("success") is not True: 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true
1117 raise ExecutorException(
1118 f"The '{engine}' handler isn't installed.\n" + get_handler_install_message(engine)
1119 )
1121 accept_connection_args = handler_meta.get("connection_args")
1122 if accept_connection_args is not None and connection_args is not None: 1122 ↛ 1123line 1122 didn't jump to line 1123 because the condition on line 1122 was never true
1123 for arg_name, arg_value in connection_args.items():
1124 if arg_name not in accept_connection_args:
1125 continue
1126 arg_meta = accept_connection_args[arg_name]
1127 arg_type = arg_meta.get("type")
1128 if arg_type == HANDLER_CONNECTION_ARG_TYPE.PATH:
1129 # arg may be one of:
1130 # str: '/home/file.pem'
1131 # dict: {'path': '/home/file.pem'}
1132 # dict: {'url': 'https://host.com/file'}
1133 arg_value = connection_args[arg_name]
1134 if isinstance(arg_value, (str, dict)) is False:
1135 raise ExecutorException(f"Unknown type of arg: '{arg_value}'")
1136 if isinstance(arg_value, str) or "path" in arg_value:
1137 path = arg_value if isinstance(arg_value, str) else arg_value["path"]
1138 if Path(path).is_file() is False:
1139 raise ExecutorException(f"File not found at: '{path}'")
1140 elif "url" in arg_value:
1141 path = download_file(arg_value["url"])
1142 else:
1143 raise ExecutorException(f"Argument '{arg_name}' must be path or url to the file")
1144 connection_args[arg_name] = path
1146 handler = self.session.integration_controller.create_tmp_handler(
1147 name=name, engine=engine, connection_args=connection_args
1148 )
1149 status = handler.check_connection()
1150 if status.copy_storage: 1150 ↛ 1151line 1150 didn't jump to line 1151 because the condition on line 1150 was never true
1151 storage = handler.handler_storage.export_files()
1152 except Exception as e:
1153 status.error_message = str(e)
1155 if status.success is False: 1155 ↛ 1156line 1155 didn't jump to line 1156 because the condition on line 1155 was never true
1156 raise ExecutorException(f"Can't connect to db: {status.error_message}")
1158 integration = self.session.integration_controller.get(name)
1159 if integration is not None: 1159 ↛ 1160line 1159 didn't jump to line 1160 because the condition on line 1159 was never true
1160 raise EntityExistsError("Database already exists", name)
1161 try:
1162 integration = ProjectController().get(name=name)
1163 except EntityNotExistsError:
1164 pass
1165 if integration is not None: 1165 ↛ 1166line 1165 didn't jump to line 1166 because the condition on line 1165 was never true
1166 raise EntityExistsError("Project exists with this name", name)
1168 self.session.integration_controller.add(name, engine, connection_args)
1169 if storage: 1169 ↛ 1170line 1169 didn't jump to line 1170 because the condition on line 1169 was never true
1170 handler = self.session.integration_controller.get_data_handler(name, connect=False)
1171 handler.handler_storage.import_files(storage)
1173 def answer_create_ml_engine(self, statement: CreateMLEngine) -> ExecuteAnswer:
1174 """Handles the `CREATE ML_ENGINE` command, which creates a new ML integration (engine) in the system.
1176 Args:
1177 statement (CreateMLEngine): The AST object representing the CREATE ML_ENGINE command.
1179 Returns:
1180 ExecuteAnswer: The result of the ML engine creation operation.
1182 Raises:
1183 ValueError: If the ml_engine name format is invalid.
1184 """
1185 name = match_one_part_name(statement.name)
1187 handler = statement.handler
1188 params = statement.params
1189 if_not_exists = getattr(statement, "if_not_exists", False)
1191 integrations = self.session.integration_controller.get_all()
1192 if name in integrations:
1193 if not if_not_exists: 1193 ↛ 1196line 1193 didn't jump to line 1196 because the condition on line 1193 was always true
1194 raise EntityExistsError("Integration already exists", name)
1195 else:
1196 return ExecuteAnswer()
1198 handler_module_meta = self.session.integration_controller.get_handler_meta(handler)
1200 if handler_module_meta is None: 1200 ↛ 1201line 1200 didn't jump to line 1201 because the condition on line 1200 was never true
1201 raise ExecutorException(f"There is no engine '{handler}'")
1203 params_out = {}
1204 if params:
1205 for key, value in variables_controller.fill_parameters(params).items():
1206 # convert ast types to string
1207 if isinstance(value, (Constant, Identifier)): 1207 ↛ 1208line 1207 didn't jump to line 1208 because the condition on line 1207 was never true
1208 value = value.to_string()
1209 params_out[key] = value
1211 try:
1212 self.session.integration_controller.add(name=name, engine=handler, connection_args=params_out)
1213 except Exception as e:
1214 msg = str(e)
1215 if type(e) in (ImportError, ModuleNotFoundError):
1216 msg = dedent(
1217 f"""\
1218 The '{handler_module_meta["name"]}' handler cannot be used. Reason is:
1219 {handler_module_meta["import"]["error_message"] or msg}
1220 """
1221 )
1222 is_cloud = self.session.config.get("cloud", False)
1223 if (
1224 is_cloud is False
1225 # NOTE: BYOM may raise these errors if there is an error in the user's code,
1226 # therefore error_message will be None
1227 and handler_module_meta["name"] != "byom"
1228 and "No module named" in handler_module_meta["import"]["error_message"]
1229 ):
1230 logger.info(get_handler_install_message(handler_module_meta["name"]))
1231 ast_drop = DropMLEngine(name=Identifier(name))
1232 self.answer_drop_ml_engine(ast_drop)
1233 logger.info(msg)
1234 raise ExecutorException(msg) from e
1236 return ExecuteAnswer()
1238 def answer_drop_ml_engine(self, statement: DropMLEngine) -> ExecuteAnswer:
1239 """Handles the `DROP ML_ENGINE` command, which removes an ML integration (engine) from the system.
1241 Args:
1242 statement (DropMLEngine): The AST object representing the DROP ML_ENGINE command.
1244 Raises:
1245 EntityNotExistsError: If the integration does not exist and IF EXISTS is not specified.
1246 ValueError: If the integration name is provided in an invalid format.
1248 Returns:
1249 ExecuteAnswer: The result of the ML engine deletion operation.
1250 """
1251 name = match_one_part_name(statement.name)
1253 integrations = self.session.integration_controller.get_all()
1254 if name not in integrations:
1255 if not statement.if_exists:
1256 raise EntityNotExistsError("Integration does not exists", name)
1257 else:
1258 return ExecuteAnswer()
1259 self.session.integration_controller.delete(name)
1260 return ExecuteAnswer()
1262 def answer_create_database(self, statement: CreateDatabase) -> ExecuteAnswer:
1263 """Create new integration or project
1265 Args:
1266 statement (CreateDatabase): data for creating database/project
1268 Returns:
1269 ExecuteAnswer: 'ok' answer
1270 """
1271 database_name = match_one_part_name(statement.name)
1273 engine = (statement.engine or "mindsdb").lower()
1275 connection_args = variables_controller.fill_parameters(statement.parameters)
1277 try:
1278 if engine == "mindsdb": 1278 ↛ 1279line 1278 didn't jump to line 1279 because the condition on line 1278 was never true
1279 ProjectController().add(database_name)
1280 else:
1281 self._create_integration(database_name, engine, connection_args)
1282 except EntityExistsError:
1283 if statement.if_not_exists is False:
1284 raise
1286 return ExecuteAnswer()
1288 def answer_drop_database(self, statement: DropDatabase | DropDatasource) -> ExecuteAnswer:
1289 """Drop a database (project or integration) by name.
1291 Args:
1292 statement (DropDatabase | DropDatasource): The parsed DROP DATABASE or DROP DATASOURCE statement.
1294 Raises:
1295 Exception: If the database name format is invalid.
1296 EntityNotExistsError: If the database does not exist and 'IF EXISTS' is not specified in the statement.
1298 Returns:
1299 ExecuteAnswer: The result of the drop database operation.
1300 """
1301 db_name = match_one_part_name(statement.name)
1303 try:
1304 self.session.database_controller.delete(db_name, strict_case=statement.name.is_quoted[0])
1305 except EntityNotExistsError:
1306 if statement.if_exists is not True:
1307 raise
1308 return ExecuteAnswer()
1310 def answer_alter_database(self, statement: AlterDatabase) -> ExecuteAnswer:
1311 db_name = match_one_part_name(statement.name)
1312 self.session.database_controller.update(
1313 db_name, data=statement.params, strict_case=statement.name.is_quoted[0], check_connection=True
1314 )
1315 return ExecuteAnswer()
1317 def answer_drop_tables(self, statement, database_name):
1318 """answer on 'drop table [if exists] {name}'
1319 Args:
1320 statement: ast
1321 """
1323 for table in statement.tables:
1324 if len(table.parts) > 1:
1325 db_name = table.parts[0]
1326 table = Identifier(parts=table.parts[1:])
1327 else:
1328 db_name = database_name
1330 dn = self.session.datahub[db_name]
1331 if dn is None:
1332 raise ExecutorException(f"Cannot delete a table from database '{db_name}': the database does not exist")
1334 if db_name is not None:
1335 dn.drop_table(table, if_exists=statement.if_exists)
1336 elif db_name in self.session.database_controller.get_dict(filter_type="project"):
1337 # TODO do we need feature: delete object from project via drop table?
1339 project = self.session.database_controller.get_project(db_name)
1340 project_tables = {key: val for key, val in project.get_tables().items() if val.get("deletable") is True}
1341 table_name = table.to_string()
1343 if table_name in project_tables:
1344 self.session.model_controller.delete_model(table_name, project_name=db_name)
1345 elif statement.if_exists is False:
1346 raise ExecutorException(f"Cannot delete a table from database '{db_name}': table does not exists")
1347 else:
1348 raise ExecutorException(f"Cannot delete a table from database '{db_name}'")
1350 return ExecuteAnswer()
1352 def answer_create_or_alter_view(self, statement: CreateView | AlterView, database_name: str) -> ExecuteAnswer:
1353 """Process CREATE and ALTER VIEW commands
1355 Args:
1356 statement (CreateView | AlterView): data for creating or altering view
1357 database_name (str): name of the current database
1359 Returns:
1360 ExecuteAnswer: answer for the command
1361 """
1362 project_name, view_name = match_two_part_name(statement.name, default_db_name=database_name)
1364 query_str = statement.query_str
1366 if isinstance(statement.from_table, Identifier):
1367 query = Select(
1368 targets=[Star()],
1369 from_table=NativeQuery(integration=statement.from_table, query=statement.query_str),
1370 )
1371 query_str = query.to_string()
1373 project = self.session.database_controller.get_project(project_name)
1375 if isinstance(statement, CreateView):
1376 try:
1377 project.create_view(view_name, query=query_str, session=self.session)
1378 except EntityExistsError:
1379 if getattr(statement, "if_not_exists", False) is False:
1380 raise
1381 elif isinstance(statement, AlterView):
1382 try:
1383 project.update_view(view_name, query=query_str, strict_case=(not view_name.islower()))
1384 except EntityNotExistsError:
1385 raise ExecutorException(f"View {view_name} does not exist in {project_name}")
1386 else:
1387 raise ValueError(f"Unknown view DDL statement: {statement}")
1389 return ExecuteAnswer()
1391 def answer_drop_view(self, statement: DropView, database_name: str) -> ExecuteAnswer:
1392 """Drop one or more views from the specified database/project.
1394 Args:
1395 statement (DropView): The parsed DROP VIEW statement containing view names and options.
1396 database_name (str): The name of the database (project) from which to drop the views.
1398 Raises:
1399 EntityNotExistsError: If a view does not exist and 'IF EXISTS' is not specified in the statement.
1400 ValueError: If the view name format is invalid.
1402 Returns:
1403 ExecuteAnswer: The result of the drop view operation.
1404 """
1405 for name in statement.names:
1406 match name.parts, name.is_quoted:
1407 case [view_name], [view_name_quoted]:
1408 db_name_quoted = False
1409 case [database_name, view_name], [db_name_quoted, view_name_quoted]:
1410 pass
1411 case _:
1412 raise ValueError(f"Invalid view name: {name}")
1414 if not db_name_quoted:
1415 database_name = database_name.lower()
1416 if not view_name_quoted:
1417 view_name = view_name.lower()
1419 project = self.session.database_controller.get_project(database_name, db_name_quoted)
1421 try:
1422 project.drop_view(view_name, strict_case=True)
1423 except EntityNotExistsError:
1424 if statement.if_exists is not True:
1425 raise
1427 return ExecuteAnswer()
1429 def answer_create_kb(self, statement: CreateKnowledgeBase, database_name: str):
1430 if statement.model:
1431 raise ExecutorException(
1432 "Creating a knowledge base using pre-existing models is no longer supported.\n"
1433 "Please pass the model parameters as a JSON object in the embedding_model field."
1434 )
1436 project_name, kb_name = match_two_part_name(statement.name, default_db_name=database_name)
1438 if statement.storage is not None:
1439 if len(statement.storage.parts) != 2:
1440 raise ExecutorException(
1441 f"Invalid vectordatabase table name: {statement.storage}Need the form 'database_name.table_name'"
1442 )
1444 if statement.from_query is not None:
1445 # TODO: implement this
1446 raise ExecutorException("Create a knowledge base from a select is not supported yet")
1448 # create the knowledge base
1449 _ = self.session.kb_controller.add(
1450 name=kb_name,
1451 project_name=project_name,
1452 # embedding_model=statement.model,
1453 storage=statement.storage,
1454 params=variables_controller.fill_parameters(statement.params),
1455 if_not_exists=statement.if_not_exists,
1456 )
1458 return ExecuteAnswer()
1460 def answer_alter_kb(self, statement: AlterKnowledgeBase, database_name: str):
1461 project_name, kb_name = match_two_part_name(
1462 statement.name, ensure_lower_case=True, default_db_name=database_name
1463 )
1465 # update the knowledge base
1466 self.session.kb_controller.update(
1467 name=kb_name,
1468 project_name=project_name,
1469 params=variables_controller.fill_parameters(statement.params),
1470 )
1472 return ExecuteAnswer()
1474 def answer_drop_kb(self, statement: DropKnowledgeBase, database_name: str) -> ExecuteAnswer:
1475 project_name, kb_name = match_two_part_name(statement.name, default_db_name=database_name)
1477 # delete the knowledge base
1478 self.session.kb_controller.delete(
1479 name=kb_name,
1480 project_name=project_name,
1481 if_exists=statement.if_exists,
1482 )
1484 return ExecuteAnswer()
1486 def answer_create_skill(self, statement, database_name):
1487 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1489 try:
1490 _ = self.session.skills_controller.add_skill(name, project_name, statement.type, statement.params)
1491 except ValueError as e:
1492 # Project does not exist or skill already exists.
1493 raise ExecutorException(str(e))
1495 return ExecuteAnswer()
1497 def answer_drop_skill(self, statement, database_name):
1498 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1500 try:
1501 self.session.skills_controller.delete_skill(name, project_name, strict_case=True)
1502 except ValueError as e:
1503 # Project does not exist or skill does not exist.
1504 raise ExecutorException(str(e))
1506 return ExecuteAnswer()
1508 def answer_update_skill(self, statement, database_name):
1509 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1511 type = statement.params.pop("type", None)
1512 try:
1513 _ = self.session.skills_controller.update_skill(
1514 name, project_name=project_name, type=type, params=statement.params
1515 )
1516 except ValueError as e:
1517 # Project does not exist or skill does not exist.
1518 raise ExecutorException(str(e))
1520 return ExecuteAnswer()
1522 def answer_create_agent(self, statement, database_name):
1523 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1525 skills = statement.params.pop("skills", [])
1526 provider = statement.params.pop("provider", None)
1527 try:
1528 _ = self.session.agents_controller.add_agent(
1529 name=name,
1530 project_name=project_name,
1531 model_name=statement.model,
1532 skills=skills,
1533 provider=provider,
1534 params=variables_controller.fill_parameters(statement.params),
1535 )
1536 except EntityExistsError as e:
1537 if statement.if_not_exists is not True:
1538 raise ExecutorException(str(e))
1539 except ValueError as e:
1540 # Project does not exist or agent already exists.
1541 raise ExecutorException(str(e))
1543 return ExecuteAnswer()
1545 def answer_drop_agent(self, statement: DropAgent, database_name: str):
1546 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1548 try:
1549 self.session.agents_controller.delete_agent(name, project_name)
1550 except ValueError as e:
1551 # Project does not exist or agent does not exist.
1552 raise ExecutorException(str(e))
1554 return ExecuteAnswer()
1556 def answer_update_agent(self, statement: UpdateAgent, database_name: str):
1557 project_name, name = match_two_part_name(statement.name, default_db_name=database_name)
1559 model = statement.params.pop("model", None)
1560 skills_to_add = statement.params.pop("skills_to_add", [])
1561 skills_to_remove = statement.params.pop("skills_to_remove", [])
1562 try:
1563 _ = self.session.agents_controller.update_agent(
1564 name,
1565 project_name=project_name,
1566 model_name=model,
1567 skills_to_add=skills_to_add,
1568 skills_to_remove=skills_to_remove,
1569 params=variables_controller.fill_parameters(statement.params),
1570 )
1571 except (EntityExistsError, EntityNotExistsError, ValueError) as e:
1572 # Project does not exist or agent does not exist.
1573 raise ExecutorException(str(e))
1575 return ExecuteAnswer()
1577 @mark_process("learn")
1578 def answer_create_predictor(self, statement: CreatePredictor, database_name: str):
1579 integration_name, model_name = match_two_part_name(statement.name, default_db_name=database_name)
1581 statement.name.parts = [integration_name, model_name]
1582 statement.name.is_quoted = [False, False]
1584 ml_integration_name = "lightwood" # default
1585 if statement.using is not None:
1586 # repack using with lower names
1587 statement.using = {k.lower(): v for k, v in statement.using.items()}
1589 ml_integration_name = statement.using.pop("engine", ml_integration_name)
1591 if statement.query_str is not None and statement.integration_name is None:
1592 # set to current project
1593 statement.integration_name = Identifier(database_name)
1595 try:
1596 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name)
1597 except EntityNotExistsError:
1598 # not exist, try to create it with same name as handler
1599 self.answer_create_ml_engine(
1600 CreateMLEngine(name=Identifier(ml_integration_name), handler=ml_integration_name)
1601 )
1603 ml_handler = self.session.integration_controller.get_ml_handler(ml_integration_name)
1605 if getattr(statement, "is_replace", False) is True:
1606 # try to delete
1607 try:
1608 self.session.model_controller.delete_model(model_name, project_name=integration_name)
1609 except EntityNotExistsError:
1610 pass
1612 try:
1613 df = self.session.model_controller.create_model(statement, ml_handler)
1614 return ExecuteAnswer(data=ResultSet.from_df(df))
1615 except EntityExistsError:
1616 if getattr(statement, "if_not_exists", False) is True:
1617 return ExecuteAnswer()
1618 raise
1620 def answer_show_columns(
1621 self,
1622 target: Identifier,
1623 where: Optional[Operation] = None,
1624 like: Optional[str] = None,
1625 is_full=False,
1626 database_name=None,
1627 ):
1628 if isinstance(target, Identifier) is False:
1629 raise TableNotExistError("The table name is required for the query.")
1631 if len(target.parts) > 1:
1632 db = target.parts[0]
1633 elif isinstance(database_name, str) and len(database_name) > 0:
1634 db = database_name
1635 else:
1636 db = self.session.config.get("default_project")
1637 table_name = target.parts[-1]
1639 new_where = BinaryOperation(
1640 "and",
1641 args=[
1642 BinaryOperation("=", args=[Identifier("TABLE_SCHEMA"), Constant(db)]),
1643 BinaryOperation("=", args=[Identifier("TABLE_NAME"), Constant(table_name)]),
1644 ],
1645 )
1646 if where is not None:
1647 new_where = BinaryOperation("and", args=[new_where, where])
1648 if like is not None:
1649 like = BinaryOperation("like", args=[Identifier("View"), Constant(like)])
1650 new_where = BinaryOperation("and", args=[new_where, like])
1652 targets = [
1653 Identifier("COLUMN_NAME", alias=Identifier("Field")),
1654 Identifier("COLUMN_TYPE", alias=Identifier("Type")),
1655 Identifier("IS_NULLABLE", alias=Identifier("Null")),
1656 Identifier("COLUMN_KEY", alias=Identifier("Key")),
1657 Identifier("COLUMN_DEFAULT", alias=Identifier("Default")),
1658 Identifier("EXTRA", alias=Identifier("Extra")),
1659 ]
1660 if is_full:
1661 targets.extend(
1662 [
1663 Constant(None, alias=Identifier("Collation")),
1664 Constant("select", alias=Identifier("Privileges")),
1665 Constant(None, alias=Identifier("Comment")),
1666 ]
1667 )
1668 new_statement = Select(
1669 targets=targets,
1670 from_table=Identifier(parts=["information_schema", "COLUMNS"]),
1671 where=new_where,
1672 )
1674 query = SQLQuery(new_statement, session=self.session, database=database_name)
1675 return self.answer_select(query)
1677 def answer_show_create_table(self, table):
1678 columns = [
1679 Column(table_name="", name="Table", type=TYPES.MYSQL_TYPE_VAR_STRING),
1680 Column(table_name="", name="Create Table", type=TYPES.MYSQL_TYPE_VAR_STRING),
1681 ]
1682 return ExecuteAnswer(
1683 data=ResultSet(
1684 columns=columns,
1685 values=[[table, f"create table {table} ()"]],
1686 )
1687 )
1689 def answer_function_status(self):
1690 columns = [
1691 Column(
1692 name="Db",
1693 alias="Db",
1694 table_name="schemata",
1695 table_alias="ROUTINES",
1696 type="str",
1697 database="mysql",
1698 charset=self.charset_text_type,
1699 ),
1700 Column(
1701 name="Db",
1702 alias="Db",
1703 table_name="routines",
1704 table_alias="ROUTINES",
1705 type="str",
1706 database="mysql",
1707 charset=self.charset_text_type,
1708 ),
1709 Column(
1710 name="Type",
1711 alias="Type",
1712 table_name="routines",
1713 table_alias="ROUTINES",
1714 type="str",
1715 database="mysql",
1716 charset=CHARSET_NUMBERS["utf8_bin"],
1717 ),
1718 Column(
1719 name="Definer",
1720 alias="Definer",
1721 table_name="routines",
1722 table_alias="ROUTINES",
1723 type="str",
1724 database="mysql",
1725 charset=CHARSET_NUMBERS["utf8_bin"],
1726 ),
1727 Column(
1728 name="Modified",
1729 alias="Modified",
1730 table_name="routines",
1731 table_alias="ROUTINES",
1732 type=TYPES.MYSQL_TYPE_TIMESTAMP,
1733 database="mysql",
1734 charset=CHARSET_NUMBERS["binary"],
1735 ),
1736 Column(
1737 name="Created",
1738 alias="Created",
1739 table_name="routines",
1740 table_alias="ROUTINES",
1741 type=TYPES.MYSQL_TYPE_TIMESTAMP,
1742 database="mysql",
1743 charset=CHARSET_NUMBERS["binary"],
1744 ),
1745 Column(
1746 name="Security_type",
1747 alias="Security_type",
1748 table_name="routines",
1749 table_alias="ROUTINES",
1750 type=TYPES.MYSQL_TYPE_STRING,
1751 database="mysql",
1752 charset=CHARSET_NUMBERS["utf8_bin"],
1753 ),
1754 Column(
1755 name="Comment",
1756 alias="Comment",
1757 table_name="routines",
1758 table_alias="ROUTINES",
1759 type=TYPES.MYSQL_TYPE_BLOB,
1760 database="mysql",
1761 charset=CHARSET_NUMBERS["utf8_bin"],
1762 ),
1763 Column(
1764 name="character_set_client",
1765 alias="character_set_client",
1766 table_name="character_sets",
1767 table_alias="ROUTINES",
1768 type=TYPES.MYSQL_TYPE_VAR_STRING,
1769 database="mysql",
1770 charset=self.charset_text_type,
1771 ),
1772 Column(
1773 name="collation_connection",
1774 alias="collation_connection",
1775 table_name="collations",
1776 table_alias="ROUTINES",
1777 type=TYPES.MYSQL_TYPE_VAR_STRING,
1778 database="mysql",
1779 charset=self.charset_text_type,
1780 ),
1781 Column(
1782 name="Database Collation",
1783 alias="Database Collation",
1784 table_name="collations",
1785 table_alias="ROUTINES",
1786 type=TYPES.MYSQL_TYPE_VAR_STRING,
1787 database="mysql",
1788 charset=self.charset_text_type,
1789 ),
1790 ]
1792 return ExecuteAnswer(data=ResultSet(columns=columns))
1794 def answer_show_table_status(self, table_name):
1795 # NOTE at this moment parsed statement only like `SHOW TABLE STATUS LIKE 'table'`.
1796 # NOTE some columns has {'database': 'mysql'}, other not. That correct. This is how real DB sends messages.
1797 columns = [
1798 {
1799 "database": "mysql",
1800 "table_name": "tables",
1801 "name": "Name",
1802 "alias": "Name",
1803 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1804 "charset": self.charset_text_type,
1805 },
1806 {
1807 "database": "",
1808 "table_name": "tables",
1809 "name": "Engine",
1810 "alias": "Engine",
1811 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1812 "charset": self.charset_text_type,
1813 },
1814 {
1815 "database": "",
1816 "table_name": "tables",
1817 "name": "Version",
1818 "alias": "Version",
1819 "type": TYPES.MYSQL_TYPE_LONGLONG,
1820 "charset": CHARSET_NUMBERS["binary"],
1821 },
1822 {
1823 "database": "mysql",
1824 "table_name": "tables",
1825 "name": "Row_format",
1826 "alias": "Row_format",
1827 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1828 "charset": self.charset_text_type,
1829 },
1830 {
1831 "database": "",
1832 "table_name": "tables",
1833 "name": "Rows",
1834 "alias": "Rows",
1835 "type": TYPES.MYSQL_TYPE_LONGLONG,
1836 "charset": CHARSET_NUMBERS["binary"],
1837 },
1838 {
1839 "database": "",
1840 "table_name": "tables",
1841 "name": "Avg_row_length",
1842 "alias": "Avg_row_length",
1843 "type": TYPES.MYSQL_TYPE_LONGLONG,
1844 "charset": CHARSET_NUMBERS["binary"],
1845 },
1846 {
1847 "database": "",
1848 "table_name": "tables",
1849 "name": "Data_length",
1850 "alias": "Data_length",
1851 "type": TYPES.MYSQL_TYPE_LONGLONG,
1852 "charset": CHARSET_NUMBERS["binary"],
1853 },
1854 {
1855 "database": "",
1856 "table_name": "tables",
1857 "name": "Max_data_length",
1858 "alias": "Max_data_length",
1859 "type": TYPES.MYSQL_TYPE_LONGLONG,
1860 "charset": CHARSET_NUMBERS["binary"],
1861 },
1862 {
1863 "database": "",
1864 "table_name": "tables",
1865 "name": "Index_length",
1866 "alias": "Index_length",
1867 "type": TYPES.MYSQL_TYPE_LONGLONG,
1868 "charset": CHARSET_NUMBERS["binary"],
1869 },
1870 {
1871 "database": "",
1872 "table_name": "tables",
1873 "name": "Data_free",
1874 "alias": "Data_free",
1875 "type": TYPES.MYSQL_TYPE_LONGLONG,
1876 "charset": CHARSET_NUMBERS["binary"],
1877 },
1878 {
1879 "database": "",
1880 "table_name": "tables",
1881 "name": "Auto_increment",
1882 "alias": "Auto_increment",
1883 "type": TYPES.MYSQL_TYPE_LONGLONG,
1884 "charset": CHARSET_NUMBERS["binary"],
1885 },
1886 {
1887 "database": "",
1888 "table_name": "tables",
1889 "name": "Create_time",
1890 "alias": "Create_time",
1891 "type": TYPES.MYSQL_TYPE_TIMESTAMP,
1892 "charset": CHARSET_NUMBERS["binary"],
1893 },
1894 {
1895 "database": "",
1896 "table_name": "tables",
1897 "name": "Update_time",
1898 "alias": "Update_time",
1899 "type": TYPES.MYSQL_TYPE_TIMESTAMP,
1900 "charset": CHARSET_NUMBERS["binary"],
1901 },
1902 {
1903 "database": "",
1904 "table_name": "tables",
1905 "name": "Check_time",
1906 "alias": "Check_time",
1907 "type": TYPES.MYSQL_TYPE_TIMESTAMP,
1908 "charset": CHARSET_NUMBERS["binary"],
1909 },
1910 {
1911 "database": "mysql",
1912 "table_name": "tables",
1913 "name": "Collation",
1914 "alias": "Collation",
1915 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1916 "charset": self.charset_text_type,
1917 },
1918 {
1919 "database": "",
1920 "table_name": "tables",
1921 "name": "Checksum",
1922 "alias": "Checksum",
1923 "type": TYPES.MYSQL_TYPE_LONGLONG,
1924 "charset": CHARSET_NUMBERS["binary"],
1925 },
1926 {
1927 "database": "",
1928 "table_name": "tables",
1929 "name": "Create_options",
1930 "alias": "Create_options",
1931 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1932 "charset": self.charset_text_type,
1933 },
1934 {
1935 "database": "",
1936 "table_name": "tables",
1937 "name": "Comment",
1938 "alias": "Comment",
1939 "type": TYPES.MYSQL_TYPE_BLOB,
1940 "charset": self.charset_text_type,
1941 },
1942 ]
1943 columns = [Column(**d) for d in columns]
1944 data = [
1945 [
1946 table_name, # Name
1947 "InnoDB", # Engine
1948 10, # Version
1949 "Dynamic", # Row_format
1950 1, # Rows
1951 16384, # Avg_row_length
1952 16384, # Data_length
1953 0, # Max_data_length
1954 0, # Index_length
1955 0, # Data_free
1956 None, # Auto_increment
1957 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Create_time
1958 datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Update_time
1959 None, # Check_time
1960 "utf8mb4_0900_ai_ci", # Collation
1961 None, # Checksum
1962 "", # Create_options
1963 "", # Comment
1964 ]
1965 ]
1966 return ExecuteAnswer(data=ResultSet(columns=columns, values=data))
1968 def answer_show_warnings(self):
1969 columns = [
1970 {
1971 "database": "",
1972 "table_name": "",
1973 "name": "Level",
1974 "alias": "Level",
1975 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1976 "charset": self.charset_text_type,
1977 },
1978 {
1979 "database": "",
1980 "table_name": "",
1981 "name": "Code",
1982 "alias": "Code",
1983 "type": TYPES.MYSQL_TYPE_LONG,
1984 "charset": CHARSET_NUMBERS["binary"],
1985 },
1986 {
1987 "database": "",
1988 "table_name": "",
1989 "name": "Message",
1990 "alias": "Message",
1991 "type": TYPES.MYSQL_TYPE_VAR_STRING,
1992 "charset": self.charset_text_type,
1993 },
1994 ]
1995 columns = [Column(**d) for d in columns]
1996 return ExecuteAnswer(data=ResultSet(columns=columns))
1998 def answer_create_table(self, statement, database_name):
1999 SQLQuery(statement, session=self.session, execute=True, database=database_name)
2000 return ExecuteAnswer()
2002 def answer_select(self, query):
2003 data = query.fetched_data
2004 return ExecuteAnswer(data=data)
2006 def answer_update_model_version(self, model_version, database_name):
2007 if not isinstance(model_version, Identifier):
2008 raise ExecutorException(f"Please define version: {model_version}")
2010 model_parts = model_version.parts
2011 version = model_parts[-1]
2012 if version.isdigit():
2013 version = int(version)
2014 else:
2015 raise ExecutorException(f"Unknown version: {version}")
2017 if len(model_parts) == 3:
2018 project_name, model_name = model_parts[:2]
2019 elif len(model_parts) == 2:
2020 model_name = model_parts[0]
2021 project_name = database_name
2022 else:
2023 raise ExecutorException(f"Unknown model: {model_version}")
2025 self.session.model_controller.set_model_active_version(project_name, model_name, version)
2026 return ExecuteAnswer()
2028 def answer_drop_model(self, statement: DropPredictor, database_name: str) -> ExecuteAnswer:
2029 """Handles the DROP MODEL (or DROP PREDICTOR) command, which removes a model
2030 or a specific model version from a project.
2032 Args:
2033 statement (DropPredictor): The AST object representing the DROP MODEL or DROP PREDICTOR command.
2034 database_name (str): The name of the current database/project.
2036 Raises:
2037 EntityNotExistsError: If the model or version does not exist and IF EXISTS is not specified.
2038 ValueError: If the model name format is invalid.
2040 Returns:
2041 ExecuteAnswer: The result of the model deletion operation.
2042 """
2043 project_name, model_name, version = resolve_model_identifier(statement.name)
2044 if project_name is None:
2045 project_name = database_name
2047 if version is not None:
2048 # delete version
2049 try:
2050 self.session.model_controller.delete_model_version(project_name, model_name, version)
2051 except EntityNotExistsError as e:
2052 if not statement.if_exists:
2053 raise e
2054 else:
2055 # drop model
2056 try:
2057 project = self.session.database_controller.get_project(project_name, strict_case=True)
2058 project.drop_model(model_name)
2059 except Exception as e:
2060 if not statement.if_exists:
2061 raise e
2063 return ExecuteAnswer()
2065 def change_default_db(self, db_name):
2066 # That fix for bug in mssql: it keeps connection for a long time, but after some time mssql can
2067 # send packet with COM_INIT_DB=null. In this case keep old database name as default.
2068 if db_name != "null":
2069 if self.session.database_controller.exists(db_name):
2070 self.session.database = db_name
2071 else:
2072 raise BadDbError(f"Database {db_name} does not exists")