Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / eof_packet.py: 38%

22 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet 

13from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum 

14 

15 

16class EofPacket(Packet): 

17 ''' 

18 Implementation based on: 

19 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet 

20 ''' 

21 

22 def setup(self): 

23 status = 0 if 'status' not in self._kwargs else self._kwargs['status'] 

24 self.eof_header = Datum('int<1>', int('0xfe', 0)) 

25 self.warning_count = Datum('int<2>', 0) 

26 self.server_status = Datum('int<2>', status) 

27 

28 @property 

29 def body(self): 

30 

31 order = [ 

32 'eof_header', 

33 'warning_count', 

34 'server_status' 

35 ] 

36 

37 string = b'' 

38 for key in order: 

39 string += getattr(self, key).toStringPacket() 

40 

41 self.setBody(string) 

42 return self._body 

43 

44 @staticmethod 

45 def test(): 

46 import pprint 

47 pprint.pprint(str(EofPacket().get_packet_string())) 

48 

49 

50# only run the test if this file is called from debugger 

51if __name__ == "__main__": 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 EofPacket.test()