Coverage for mindsdb / integrations / handlers / pypi_handler / pypi_handler.py: 0%

27 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser import parse_sql 

2 

3from mindsdb.integrations.handlers.pypi_handler.api import PyPI 

4from mindsdb.integrations.handlers.pypi_handler.pypi_tables import ( 

5 PyPIOverallTable, 

6 PyPIPythonMajorTable, 

7 PyPIPythonMinorTable, 

8 PyPIRecentTable, 

9 PyPISystemTable, 

10) 

11from mindsdb.integrations.libs.api_handler import APIHandler 

12from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

13 

14 

15class PyPIHandler(APIHandler): 

16 def __init__(self, name: str, **kwargs) -> None: 

17 """initializer method 

18 

19 Args: 

20 name (str): handler's name 

21 """ 

22 super().__init__(name) 

23 

24 self.connection = None 

25 self.is_connected = False 

26 

27 _tables = [ 

28 PyPIOverallTable, 

29 PyPIPythonMajorTable, 

30 PyPIPythonMinorTable, 

31 PyPIRecentTable, 

32 PyPISystemTable, 

33 ] 

34 

35 for Table in _tables: 

36 self._register_table(Table.name, Table(self)) 

37 

38 def check_connection(self) -> StatusResponse: 

39 response = StatusResponse(False) 

40 checking = PyPI.is_connected() 

41 if checking["status"]: 

42 response.success = True 

43 else: 

44 response.error_message = checking["message"] 

45 

46 self.is_connected = True 

47 

48 return response 

49 

50 def connect(self) -> PyPI: 

51 """making the connectino object 

52 

53 Returns: 

54 PyPI: pypi class as the returned value 

55 """ 

56 self.connection = PyPI 

57 return self.connection 

58 

59 def native_query(self, query: str) -> StatusResponse: 

60 """Receive and process a raw query. 

61 

62 Parameters 

63 ---------- 

64 query : str 

65 query in a native format 

66 

67 Returns 

68 ------- 

69 StatusResponse 

70 Request status 

71 """ 

72 ast = parse_sql(query) 

73 return self.query(ast)