Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packet.py: 23%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12import struct 

13 

14from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MAX_PACKET_SIZE 

15from mindsdb.utilities import log 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class Packet: 

21 def __init__( 

22 self, 

23 length=0, 

24 body="", 

25 packet_string=None, 

26 socket=None, 

27 session=None, 

28 proxy=None, 

29 **kwargs, 

30 ): 

31 self.mysql_socket = socket 

32 self.session = session 

33 self.proxy = proxy 

34 self._kwargs = kwargs 

35 

36 self.setup() 

37 if packet_string is not None: 

38 self.load_from_packet_string(packet_string) 

39 else: 

40 self.load_from_params(length, session.packet_sequence_number, body) 

41 

42 def setup(self, length=0, seq=0, body=None): 

43 ... 

44 

45 def load_from_params(self, length, seq, body): 

46 self._length = length 

47 self._seq = seq 

48 self._body = body 

49 

50 def setBody(self, body_string): 

51 self._body = body_string 

52 self._length = len(body_string) 

53 

54 def load_from_packet_string(self, packet_string): 

55 len_header = struct.unpack("i", packet_string[:3] + b"\x00")[0] 

56 count_header = int(packet_string[3]) 

57 body = packet_string[4:] 

58 self.load_from_params(length=len_header, seq=count_header, body=body) 

59 

60 def get_packet_string(self): 

61 body = self.body 

62 len_header = struct.pack("<i", self.length)[:3] # keep it 3 bytes 

63 count_header = struct.pack("B", self.seq) 

64 packet = len_header + count_header + body 

65 return packet 

66 

67 def get(self): 

68 self.session.logging.debug(f"Get packet: {self.__class__.__name__}") 

69 

70 len_header = MAX_PACKET_SIZE 

71 body = b"" 

72 count_header = 1 

73 while len_header == MAX_PACKET_SIZE: 

74 packet_string = self.mysql_socket.recv(4) 

75 if len(packet_string) < 4: 

76 self.session.logging.debug( 

77 f"Packet with less than 4 bytes in length: {packet_string}" 

78 ) 

79 return False 

80 len_header = struct.unpack("i", packet_string[:3] + b"\x00")[0] 

81 count_header = int(packet_string[3]) 

82 if len_header == 0: 

83 break 

84 body += self.mysql_socket.recv(len_header) 

85 self.session.logging.debug(f"Got packet: {str(body)}") 

86 self.session.packet_sequence_number = (int(count_header) + 1) % 256 

87 self.setup(len(body), count_header, body) 

88 return True 

89 

90 def send(self): 

91 string = self.get_packet_string() 

92 self.session.logging.debug(f"Sending packet: {self.__class__.__name__}") 

93 self.session.logging.debug(string) 

94 self.mysql_socket.sendall(string) 

95 

96 def accum(self): 

97 string = self.get_packet_string() 

98 return string 

99 

100 def pprintPacket(self, body=None): 

101 if body is None: 

102 body = self.body 

103 logger.info(str(self)) 

104 for i, x in enumerate(body): 

105 part = "[BODY]" 

106 logger.info( 

107 """{part}{i}:{h} ({inte}:{actual})""".format( 

108 part=part, i=i + 1, h=hex(ord(x)), inte=ord(x), actual=str(x) 

109 ) 

110 ) 

111 

112 def isEOF(self): 

113 if self.length == 0: 

114 return True 

115 else: 

116 return False 

117 

118 @property 

119 def length(self): 

120 # self._length = len(self.body) 

121 return self._length 

122 

123 @property 

124 def seq(self): 

125 return self._seq 

126 

127 @property 

128 def body(self): 

129 return self._body 

130 

131 @staticmethod 

132 def bodyStringToPackets(body_string): 

133 """ 

134 The method takes a string and turns it into mysql_packets 

135 

136 :param body_string: text to turn into mysql_packets 

137 :return: a list of mysql_packets 

138 

139 """ 

140 ret = [] 

141 body_len = len(body_string) 

142 mod = body_len % MAX_PACKET_SIZE 

143 num_packets = body_len // MAX_PACKET_SIZE + (1 if mod > 0 else 0) 

144 

145 for i in range(num_packets): 

146 left_limit = i * MAX_PACKET_SIZE 

147 right_limit = mod if i + 1 == num_packets else MAX_PACKET_SIZE * (i + 1) 

148 body = body_string[left_limit:right_limit] 

149 ret += [Packet(length=right_limit, seq=i + 1, body=body)] 

150 

151 return ret 

152 

153 def __str__(self): 

154 return str({"body": self.body, "length": self.length, "seq": self.seq}) 

155 

156 

157def test(): 

158 import pprint 

159 

160 pprint.pprint(Packet.bodyStringToPackets("abdds")[0].get_packet_string()) 

161 

162 

163# only run the test if this file is called from debugger 

164if __name__ == "__main__": 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 test()