Coverage for mindsdb / api / mysql / mysql_proxy / data_types / mysql_packets / switch_auth_packet.py: 36%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12from mindsdb.api.mysql.mysql_proxy.data_types.mysql_packet import Packet 

13from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum 

14 

15 

16class SwitchOutPacket(Packet): 

17 ''' 

18 Implementation based on: 

19 https://mariadb.com/kb/en/library/1-connecting-connecting/#initial-handshake-packet 

20 ''' 

21 

22 def setup(self): 

23 status = 0 if 'status' not in self._kwargs else self._kwargs['status'] # noqa 

24 seed = self._kwargs['seed'] 

25 method = self._kwargs['method'] 

26 self.eof_header = Datum('int<1>', int('0xfe', 0)) 

27 self.authentication_plugin_name = Datum('string<NUL>', method) 

28 self.seed = Datum('string<NUL>', seed) 

29 

30 @property 

31 def body(self): 

32 

33 order = [ 

34 'eof_header', 

35 'authentication_plugin_name', 

36 'seed' 

37 ] 

38 

39 string = b'' 

40 for key in order: 

41 string += getattr(self, key).toStringPacket() 

42 

43 self.setBody(string) 

44 return self._body 

45 

46 @staticmethod 

47 def test(): 

48 import pprint 

49 pprint.pprint(str(SwitchOutPacket().get_packet_string())) 

50 

51 

52if __name__ == "__main__": 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 SwitchOutPacket.test()