Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / command_packet.py: 9%
79 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""
2*******************************************************
3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com>
4 *
5 * This file is part of MindsDB Server.
6 *
7 * MindsDB Server can not be copied and/or distributed without the express
8 * permission of MindsDB Inc
9 *******************************************************
10"""
12import struct
13import math
15from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet
16from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum
17from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import COMMANDS, getConstName, TYPES
20class CommandPacket(Packet):
21 '''
22 Implementation based on description:
23 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet
24 '''
26 def _read_byte(self, buffer):
27 b = buffer[:1]
28 buffer = buffer[1:]
29 b = struct.unpack('<B', b)[0]
30 return b, buffer
32 def read_params(self, buffer, num_params):
33 if not num_params > 0:
34 return
36 # read null-map
37 null_bytes = math.floor((num_params + 7) / 8)
38 nulls = []
39 for i in range(null_bytes):
40 b, buffer = self._read_byte(buffer)
41 for i in range(8):
42 nulls.append(((1 << i) & b) != 0)
44 # read send-type byte
45 b, buffer = self._read_byte(buffer)
47 types = []
48 if b == 1:
49 # read types
50 for i in range(num_params):
51 t, buffer = self._read_byte(buffer)
52 s, buffer = self._read_byte(buffer)
53 types.append(dict(
54 type=t,
55 signed=s
56 ))
58 datumtypes = {
59 TYPES.MYSQL_TYPE_VAR_STRING: 'string<lenenc>',
60 TYPES.MYSQL_TYPE_STRING: 'string<lenenc>',
61 TYPES.MYSQL_TYPE_VARCHAR: 'string<lenenc>',
63 TYPES.MYSQL_TYPE_TINY: 'int<1>',
64 TYPES.MYSQL_TYPE_SHORT: 'int<2>',
65 TYPES.MYSQL_TYPE_LONG: 'int<4>',
66 TYPES.MYSQL_TYPE_LONGLONG: 'int<8>',
67 }
69 for i in range(num_params):
70 if nulls[i]:
71 self.parameters.append(None)
72 continue
74 datum_type = datumtypes.get(types[i]['type'])
75 if datum_type is not None:
76 x = Datum(datum_type)
77 buffer = x.setFromBuff(buffer)
78 value = x.value
79 if isinstance(value, bytes):
80 value = value.decode()
82 self.parameters.append(value)
83 else:
84 # NOTE at this moment all sends as strings and it works
85 raise Exception(f"Unsupported type {types[i]['type']}")
87 def setup(self, length=0, count_header=1, body=''):
88 if length == 0:
89 return
91 # self.salt=self.session.salt
93 self._length = length
94 self._seq = count_header
95 self._body = body
97 self.type = Datum('int<1>')
98 buffer = body
99 buffer = self.type.setFromBuff(buffer)
101 if self.type.value in (COMMANDS.COM_QUERY, COMMANDS.COM_STMT_PREPARE):
102 self.sql = Datum('str<EOF>')
103 buffer = self.sql.setFromBuff(buffer)
104 elif self.type.value == COMMANDS.COM_STMT_EXECUTE:
105 # https://mariadb.com/kb/en/com_stmt_execute/
106 self.stmt_id = Datum('int<4>')
107 buffer = self.stmt_id.setFromBuff(buffer)
108 self.flags = Datum('int<1>')
109 buffer = self.flags.setFromBuff(buffer)
110 self.iteration_count = Datum('int<4>')
111 buffer = self.iteration_count.setFromBuff(buffer)
113 self.parameters = []
115 prepared_stmt = self.session.prepared_stmts[self.stmt_id.value]
117 num_params = len(prepared_stmt['statement'].params)
118 self.read_params(buffer, num_params)
119 #
120 # if prepared_stmt['type'] == 'select':
121 # num_params = len(prepared_stmt['statement'].parameters)
122 #
123 # self.read_params(buffer, num_params)
124 #
125 # elif prepared_stmt['type'] in ['insert', 'delete']:
126 # # if prepared_stmt['type'] == 'insert':
127 # # prepared_stmt['statement'].sql
128 # # statement = parse_sql(prepared_stmt['statement'].sql)
129 # # num_params = 0
130 # # for row in statement.values:
131 # # for item in row:
132 # # if isinstance(item, Parameter):
133 # # num_params = num_params + 1
134 # # elif prepared_stmt['type'] == 'delete':
135 # # num_params = prepared_stmt['statement'].sql.count('?')
136 #
137 # num_params = len(prepared_stmt['statement'].parameters)
138 # self.read_params(buffer, num_params)
139 elif self.type.value == COMMANDS.COM_STMT_CLOSE:
140 self.stmt_id = Datum('int<4>')
141 buffer = self.stmt_id.setFromBuff(buffer)
142 elif self.type.value == COMMANDS.COM_STMT_FETCH:
143 self.stmt_id = Datum('int<4>')
144 buffer = self.stmt_id.setFromBuff(buffer)
145 self.limit = Datum('int<4>')
146 buffer = self.limit.setFromBuff(buffer)
147 elif self.type.value == COMMANDS.COM_INIT_DB:
148 self.database = Datum('str<EOF>')
149 buffer = self.database.setFromBuff(buffer)
150 else:
151 self.data = Datum('str<EOF>')
152 buffer = self.data.setFromBuff(buffer)
154 def __str__(self):
155 return str({
156 'header': {'length': self.length, 'seq': self.seq},
157 'type': getConstName(COMMANDS, self.type.value),
158 'vars': self.__dict__
159 })