Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / ok_packet.py: 21%
51 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""
2*******************************************************
3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com>
4 *
5 * This file is part of MindsDB Server.
6 *
7 * MindsDB Server can not be copied and/or distributed without the express
8 * permission of MindsDB Inc
9 *******************************************************
10"""
12import struct
14from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet
15from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum
16from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import SESSION_TRACK, SERVER_STATUS
19class OkPacket(Packet):
20 '''
21 Implementation based on:
22 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet
23 '''
25 '''
26 int<1> 0x00 : OK_Packet header or (0xFE if CLIENT_DEPRECATE_EOF is set)
27 int<lenenc> affected rows
28 int<lenenc> last insert id
29 int<2> server status
30 int<2> warning count
31 if session_tracking_supported (see CLIENT_SESSION_TRACK)
32 string<lenenc> info
33 if (status flags & SERVER_SESSION_STATE_CHANGED)
34 string<lenenc> session state info
35 string<lenenc> value of variable
36 else
37 string<EOF> info
38 '''
40 def setup(self):
41 eof = self._kwargs.get('eof', False)
42 self.ok_header = Datum('int<1>', 0xFE if eof is True else 0)
43 self.affected_rows = Datum('int<lenenc>', self._kwargs.get('affected_rows') or 0)
44 self.last_insert_id = Datum('int<lenenc>', 0)
45 status = self._kwargs.get('status', 0x0002)
46 self.server_status = Datum('int<2>', status)
47 # Datum('int<2>', 0)
48 self.warning_count = Datum('int<2>', 0)
50 self.state_track = None
51 state_track = self._kwargs.get('state_track') # [[key: value]]
52 if state_track is not None:
53 accum = b''
54 status = status | SERVER_STATUS.SERVER_SESSION_STATE_CHANGED
55 self.server_status = Datum('int<2>', status)
56 self.state_track = b''
57 self.state_track += Datum('string<lenenc>', '').toStringPacket() # 'info' - human readable status information
58 for el in state_track:
59 # NOTE at this moment just system variables
60 name, value = el
61 part = Datum('string<lenenc>', name).toStringPacket()
62 part += Datum('string<lenenc>', value).toStringPacket()
63 accum += struct.pack('i', SESSION_TRACK.SESSION_TRACK_SYSTEM_VARIABLES)[:1] + struct.pack('i', len(part))[:1] + part
64 # self.state_track
65 # self.state_track.append(Datum('string<lenenc>', '')
66 accum = struct.pack('i', len(accum))[:1] + accum
67 self.state_track += accum
69 self.info = Datum('string<EOF>')
71 @property
72 def body(self):
74 order = [
75 'ok_header',
76 'affected_rows',
77 'last_insert_id',
78 'server_status',
79 'warning_count',
80 'state_track',
81 'info',
82 ]
83 string = b''
84 for key in order:
85 item = getattr(self, key)
86 section_pack = b''
87 if item is None:
88 continue
89 elif isinstance(item, bytes):
90 section_pack = item
91 else:
92 section_pack = getattr(self, key).toStringPacket()
93 string += section_pack
95 self.setBody(string)
96 return self._body
98 @staticmethod
99 def test():
100 import pprint
101 pprint.pprint(str(OkPacket(state_track=[['character_set_client', 'utf8'], ['character_set_connection', 'utf8'], ['character_set_results', 'utf8']]).get_packet_string()))
104# only run the test if this file is called from debugger
105if __name__ == "__main__": 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 OkPacket.test()