Coverage for mindsdb / api / mysql / mysql_proxy / executor / mysql_executor.py: 66%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser import parse_sql 

2from mindsdb_sql_parser.exceptions import ParsingException 

3from mindsdb_sql_parser.ast.base import ASTNode 

4import mindsdb.utilities.profiler as profiler 

5from mindsdb.api.executor.sql_query import SQLQuery 

6from mindsdb.api.executor.sql_query.result_set import Column 

7from mindsdb.api.executor.planner import utils as planner_utils 

8from mindsdb.api.executor.data_types.answer import ExecuteAnswer 

9from mindsdb.api.executor.command_executor import ExecuteCommands 

10from mindsdb.api.executor.exceptions import SqlSyntaxError 

11from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

12from mindsdb.utilities import log 

13 

14logger = log.getLogger(__name__) 

15 

16 

17class Executor: 

18 def __init__(self, session, sqlserver): 

19 self.session = session 

20 self.sqlserver = sqlserver 

21 

22 self.query: ASTNode = None 

23 

24 self.columns: list[Column] = [] 

25 self.params: list[Column] = [] 

26 self.data = None 

27 self.server_status = None 

28 self.is_executed = False 

29 self.error_message = None 

30 self.error_code = None 

31 self.executor_answer: ExecuteAnswer = None 

32 

33 self.sql = "" 

34 self.sql_lower = "" 

35 

36 context = {"connection_id": self.sqlserver.connection_id} 

37 self.command_executor = ExecuteCommands(self.session, context) 

38 

39 def change_default_db(self, new_db): 

40 self.command_executor.change_default_db(new_db) 

41 

42 def stmt_prepare(self, sql): 

43 self.parse(sql) 

44 

45 # if not params 

46 params = planner_utils.get_query_params(self.query) 

47 if len(params) == 0: 

48 # execute immediately 

49 self.do_execute() 

50 

51 else: 

52 # plan query 

53 # TODO less complex. 

54 # planner is inside SQLQuery now. 

55 

56 sqlquery = SQLQuery(self.query, session=self.session, execute=False) 

57 

58 sqlquery.prepare_query() 

59 

60 self.params = [Column(name=p.value, alias=p.value, type=MYSQL_DATA_TYPE.TEXT) for p in params] 

61 

62 # TODO: 

63 # select * from mindsdb.models doesn't invoke prepare_steps and columns_list is empty 

64 self.columns = sqlquery.columns_list 

65 

66 def stmt_execute(self, param_values): 

67 if self.is_executed: 

68 return 

69 

70 # fill params 

71 self.query = planner_utils.fill_query_params(self.query, param_values) 

72 

73 # execute query 

74 self.do_execute() 

75 

76 @profiler.profile() 

77 def query_execute(self, sql): 

78 self.parse(sql) 

79 self.do_execute() 

80 

81 @profiler.profile() 

82 def parse(self, sql): 

83 self.sql = sql 

84 sql_lower = sql.lower() 

85 self.sql_lower = sql_lower.replace("`", "") 

86 

87 try: 

88 self.query = parse_sql(sql) 

89 except ParsingException as mdb_error: 

90 # not all statements are parsed by parse_sql 

91 logger.warning("Failed to parse SQL query") 

92 logger.debug(f"Query that cannot be parsed: {sql}") 

93 

94 raise SqlSyntaxError(f"The SQL statement cannot be parsed - {sql}: {mdb_error}") from mdb_error 

95 except Exception: 

96 logger.exception(f"Unexpected error while parsing SQL query: {sql}") 

97 raise 

98 

99 @profiler.profile() 

100 def do_execute(self): 

101 # it can be already run at prepare state 

102 if self.is_executed: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 return 

104 

105 executor_answer: ExecuteAnswer = self.command_executor.execute_command(self.query) 

106 self.executor_answer = executor_answer 

107 

108 self.is_executed = True