Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / ok_packet.py: 21%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12import struct 

13 

14from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet 

15from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum 

16from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import SESSION_TRACK, SERVER_STATUS 

17 

18 

19class OkPacket(Packet): 

20 ''' 

21 Implementation based on: 

22 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet 

23 ''' 

24 

25 ''' 

26 int<1> 0x00 : OK_Packet header or (0xFE if CLIENT_DEPRECATE_EOF is set) 

27 int<lenenc> affected rows 

28 int<lenenc> last insert id 

29 int<2> server status 

30 int<2> warning count 

31 if session_tracking_supported (see CLIENT_SESSION_TRACK) 

32 string<lenenc> info 

33 if (status flags & SERVER_SESSION_STATE_CHANGED) 

34 string<lenenc> session state info 

35 string<lenenc> value of variable 

36 else 

37 string<EOF> info 

38 ''' 

39 

40 def setup(self): 

41 eof = self._kwargs.get('eof', False) 

42 self.ok_header = Datum('int<1>', 0xFE if eof is True else 0) 

43 self.affected_rows = Datum('int<lenenc>', self._kwargs.get('affected_rows') or 0) 

44 self.last_insert_id = Datum('int<lenenc>', 0) 

45 status = self._kwargs.get('status', 0x0002) 

46 self.server_status = Datum('int<2>', status) 

47 # Datum('int<2>', 0) 

48 self.warning_count = Datum('int<2>', 0) 

49 

50 self.state_track = None 

51 state_track = self._kwargs.get('state_track') # [[key: value]] 

52 if state_track is not None: 

53 accum = b'' 

54 status = status | SERVER_STATUS.SERVER_SESSION_STATE_CHANGED 

55 self.server_status = Datum('int<2>', status) 

56 self.state_track = b'' 

57 self.state_track += Datum('string<lenenc>', '').toStringPacket() # 'info' - human readable status information 

58 for el in state_track: 

59 # NOTE at this moment just system variables 

60 name, value = el 

61 part = Datum('string<lenenc>', name).toStringPacket() 

62 part += Datum('string<lenenc>', value).toStringPacket() 

63 accum += struct.pack('i', SESSION_TRACK.SESSION_TRACK_SYSTEM_VARIABLES)[:1] + struct.pack('i', len(part))[:1] + part 

64 # self.state_track 

65 # self.state_track.append(Datum('string<lenenc>', '') 

66 accum = struct.pack('i', len(accum))[:1] + accum 

67 self.state_track += accum 

68 

69 self.info = Datum('string<EOF>') 

70 

71 @property 

72 def body(self): 

73 

74 order = [ 

75 'ok_header', 

76 'affected_rows', 

77 'last_insert_id', 

78 'server_status', 

79 'warning_count', 

80 'state_track', 

81 'info', 

82 ] 

83 string = b'' 

84 for key in order: 

85 item = getattr(self, key) 

86 section_pack = b'' 

87 if item is None: 

88 continue 

89 elif isinstance(item, bytes): 

90 section_pack = item 

91 else: 

92 section_pack = getattr(self, key).toStringPacket() 

93 string += section_pack 

94 

95 self.setBody(string) 

96 return self._body 

97 

98 @staticmethod 

99 def test(): 

100 import pprint 

101 pprint.pprint(str(OkPacket(state_track=[['character_set_client', 'utf8'], ['character_set_connection', 'utf8'], ['character_set_results', 'utf8']]).get_packet_string())) 

102 

103 

104# only run the test if this file is called from debugger 

105if __name__ == "__main__": 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 OkPacket.test()