Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packet.py: 23%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""
2*******************************************************
3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com>
4 *
5 * This file is part of MindsDB Server.
6 *
7 * MindsDB Server can not be copied and/or distributed without the express
8 * permission of MindsDB Inc
9 *******************************************************
10"""
12import struct
14from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MAX_PACKET_SIZE
15from mindsdb.utilities import log
17logger = log.getLogger(__name__)
20class Packet:
21 def __init__(
22 self,
23 length=0,
24 body="",
25 packet_string=None,
26 socket=None,
27 session=None,
28 proxy=None,
29 **kwargs,
30 ):
31 self.mysql_socket = socket
32 self.session = session
33 self.proxy = proxy
34 self._kwargs = kwargs
36 self.setup()
37 if packet_string is not None:
38 self.load_from_packet_string(packet_string)
39 else:
40 self.load_from_params(length, session.packet_sequence_number, body)
42 def setup(self, length=0, seq=0, body=None):
43 ...
45 def load_from_params(self, length, seq, body):
46 self._length = length
47 self._seq = seq
48 self._body = body
50 def setBody(self, body_string):
51 self._body = body_string
52 self._length = len(body_string)
54 def load_from_packet_string(self, packet_string):
55 len_header = struct.unpack("i", packet_string[:3] + b"\x00")[0]
56 count_header = int(packet_string[3])
57 body = packet_string[4:]
58 self.load_from_params(length=len_header, seq=count_header, body=body)
60 def get_packet_string(self):
61 body = self.body
62 len_header = struct.pack("<i", self.length)[:3] # keep it 3 bytes
63 count_header = struct.pack("B", self.seq)
64 packet = len_header + count_header + body
65 return packet
67 def get(self):
68 self.session.logging.debug(f"Get packet: {self.__class__.__name__}")
70 len_header = MAX_PACKET_SIZE
71 body = b""
72 count_header = 1
73 while len_header == MAX_PACKET_SIZE:
74 packet_string = self.mysql_socket.recv(4)
75 if len(packet_string) < 4:
76 self.session.logging.debug(
77 f"Packet with less than 4 bytes in length: {packet_string}"
78 )
79 return False
80 len_header = struct.unpack("i", packet_string[:3] + b"\x00")[0]
81 count_header = int(packet_string[3])
82 if len_header == 0:
83 break
84 body += self.mysql_socket.recv(len_header)
85 self.session.logging.debug(f"Got packet: {str(body)}")
86 self.session.packet_sequence_number = (int(count_header) + 1) % 256
87 self.setup(len(body), count_header, body)
88 return True
90 def send(self):
91 string = self.get_packet_string()
92 self.session.logging.debug(f"Sending packet: {self.__class__.__name__}")
93 self.session.logging.debug(string)
94 self.mysql_socket.sendall(string)
96 def accum(self):
97 string = self.get_packet_string()
98 return string
100 def pprintPacket(self, body=None):
101 if body is None:
102 body = self.body
103 logger.info(str(self))
104 for i, x in enumerate(body):
105 part = "[BODY]"
106 logger.info(
107 """{part}{i}:{h} ({inte}:{actual})""".format(
108 part=part, i=i + 1, h=hex(ord(x)), inte=ord(x), actual=str(x)
109 )
110 )
112 def isEOF(self):
113 if self.length == 0:
114 return True
115 else:
116 return False
118 @property
119 def length(self):
120 # self._length = len(self.body)
121 return self._length
123 @property
124 def seq(self):
125 return self._seq
127 @property
128 def body(self):
129 return self._body
131 @staticmethod
132 def bodyStringToPackets(body_string):
133 """
134 The method takes a string and turns it into mysql_packets
136 :param body_string: text to turn into mysql_packets
137 :return: a list of mysql_packets
139 """
140 ret = []
141 body_len = len(body_string)
142 mod = body_len % MAX_PACKET_SIZE
143 num_packets = body_len // MAX_PACKET_SIZE + (1 if mod > 0 else 0)
145 for i in range(num_packets):
146 left_limit = i * MAX_PACKET_SIZE
147 right_limit = mod if i + 1 == num_packets else MAX_PACKET_SIZE * (i + 1)
148 body = body_string[left_limit:right_limit]
149 ret += [Packet(length=right_limit, seq=i + 1, body=body)]
151 return ret
153 def __str__(self):
154 return str({"body": self.body, "length": self.length, "seq": self.seq})
157def test():
158 import pprint
160 pprint.pprint(Packet.bodyStringToPackets("abdds")[0].get_packet_string())
163# only run the test if this file is called from debugger
164if __name__ == "__main__": 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 test()