Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / command_packet.py: 9%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12import struct 

13import math 

14 

15from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet 

16from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum 

17from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import COMMANDS, getConstName, TYPES 

18 

19 

20class CommandPacket(Packet): 

21 ''' 

22 Implementation based on description: 

23 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet 

24 ''' 

25 

26 def _read_byte(self, buffer): 

27 b = buffer[:1] 

28 buffer = buffer[1:] 

29 b = struct.unpack('<B', b)[0] 

30 return b, buffer 

31 

32 def read_params(self, buffer, num_params): 

33 if not num_params > 0: 

34 return 

35 

36 # read null-map 

37 null_bytes = math.floor((num_params + 7) / 8) 

38 nulls = [] 

39 for i in range(null_bytes): 

40 b, buffer = self._read_byte(buffer) 

41 for i in range(8): 

42 nulls.append(((1 << i) & b) != 0) 

43 

44 # read send-type byte 

45 b, buffer = self._read_byte(buffer) 

46 

47 types = [] 

48 if b == 1: 

49 # read types 

50 for i in range(num_params): 

51 t, buffer = self._read_byte(buffer) 

52 s, buffer = self._read_byte(buffer) 

53 types.append(dict( 

54 type=t, 

55 signed=s 

56 )) 

57 

58 datumtypes = { 

59 TYPES.MYSQL_TYPE_VAR_STRING: 'string<lenenc>', 

60 TYPES.MYSQL_TYPE_STRING: 'string<lenenc>', 

61 TYPES.MYSQL_TYPE_VARCHAR: 'string<lenenc>', 

62 

63 TYPES.MYSQL_TYPE_TINY: 'int<1>', 

64 TYPES.MYSQL_TYPE_SHORT: 'int<2>', 

65 TYPES.MYSQL_TYPE_LONG: 'int<4>', 

66 TYPES.MYSQL_TYPE_LONGLONG: 'int<8>', 

67 } 

68 

69 for i in range(num_params): 

70 if nulls[i]: 

71 self.parameters.append(None) 

72 continue 

73 

74 datum_type = datumtypes.get(types[i]['type']) 

75 if datum_type is not None: 

76 x = Datum(datum_type) 

77 buffer = x.setFromBuff(buffer) 

78 value = x.value 

79 if isinstance(value, bytes): 

80 value = value.decode() 

81 

82 self.parameters.append(value) 

83 else: 

84 # NOTE at this moment all sends as strings and it works 

85 raise Exception(f"Unsupported type {types[i]['type']}") 

86 

87 def setup(self, length=0, count_header=1, body=''): 

88 if length == 0: 

89 return 

90 

91 # self.salt=self.session.salt 

92 

93 self._length = length 

94 self._seq = count_header 

95 self._body = body 

96 

97 self.type = Datum('int<1>') 

98 buffer = body 

99 buffer = self.type.setFromBuff(buffer) 

100 

101 if self.type.value in (COMMANDS.COM_QUERY, COMMANDS.COM_STMT_PREPARE): 

102 self.sql = Datum('str<EOF>') 

103 buffer = self.sql.setFromBuff(buffer) 

104 elif self.type.value == COMMANDS.COM_STMT_EXECUTE: 

105 # https://mariadb.com/kb/en/com_stmt_execute/ 

106 self.stmt_id = Datum('int<4>') 

107 buffer = self.stmt_id.setFromBuff(buffer) 

108 self.flags = Datum('int<1>') 

109 buffer = self.flags.setFromBuff(buffer) 

110 self.iteration_count = Datum('int<4>') 

111 buffer = self.iteration_count.setFromBuff(buffer) 

112 

113 self.parameters = [] 

114 

115 prepared_stmt = self.session.prepared_stmts[self.stmt_id.value] 

116 

117 num_params = len(prepared_stmt['statement'].params) 

118 self.read_params(buffer, num_params) 

119 # 

120 # if prepared_stmt['type'] == 'select': 

121 # num_params = len(prepared_stmt['statement'].parameters) 

122 # 

123 # self.read_params(buffer, num_params) 

124 # 

125 # elif prepared_stmt['type'] in ['insert', 'delete']: 

126 # # if prepared_stmt['type'] == 'insert': 

127 # # prepared_stmt['statement'].sql 

128 # # statement = parse_sql(prepared_stmt['statement'].sql) 

129 # # num_params = 0 

130 # # for row in statement.values: 

131 # # for item in row: 

132 # # if isinstance(item, Parameter): 

133 # # num_params = num_params + 1 

134 # # elif prepared_stmt['type'] == 'delete': 

135 # # num_params = prepared_stmt['statement'].sql.count('?') 

136 # 

137 # num_params = len(prepared_stmt['statement'].parameters) 

138 # self.read_params(buffer, num_params) 

139 elif self.type.value == COMMANDS.COM_STMT_CLOSE: 

140 self.stmt_id = Datum('int<4>') 

141 buffer = self.stmt_id.setFromBuff(buffer) 

142 elif self.type.value == COMMANDS.COM_STMT_FETCH: 

143 self.stmt_id = Datum('int<4>') 

144 buffer = self.stmt_id.setFromBuff(buffer) 

145 self.limit = Datum('int<4>') 

146 buffer = self.limit.setFromBuff(buffer) 

147 elif self.type.value == COMMANDS.COM_INIT_DB: 

148 self.database = Datum('str<EOF>') 

149 buffer = self.database.setFromBuff(buffer) 

150 else: 

151 self.data = Datum('str<EOF>') 

152 buffer = self.data.setFromBuff(buffer) 

153 

154 def __str__(self): 

155 return str({ 

156 'header': {'length': self.length, 'seq': self.seq}, 

157 'type': getConstName(COMMANDS, self.type.value), 

158 'vars': self.__dict__ 

159 })