Coverage for mindsdb / integrations / handlers / npm_handler / npm_handler.py: 0%

26 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser import parse_sql 

2 

3from mindsdb.integrations.handlers.npm_handler.api import NPM 

4from mindsdb.integrations.handlers.npm_handler.npm_tables import ( 

5 NPMMetadataTable, 

6 NPMMaintainersTable, 

7 NPMKeywordsTable, 

8 NPMDependenciesTable, 

9 NPMDevDependenciesTable, 

10 NPMOptionalDependenciesTable, 

11 NPMGithubStatsTable, 

12) 

13from mindsdb.integrations.libs.api_handler import APIHandler 

14from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

15 

16 

17class NPMHandler(APIHandler): 

18 

19 def __init__(self, name: str, **kwargs) -> None: 

20 super().__init__(name) 

21 self.connection = None 

22 self.is_connected = False 

23 _tables = [ 

24 NPMMetadataTable, 

25 NPMMaintainersTable, 

26 NPMKeywordsTable, 

27 NPMDependenciesTable, 

28 NPMDevDependenciesTable, 

29 NPMOptionalDependenciesTable, 

30 NPMGithubStatsTable, 

31 ] 

32 for Table in _tables: 

33 self._register_table(Table.name, Table(self)) 

34 

35 def check_connection(self) -> StatusResponse: 

36 """Check if connected""" 

37 response = StatusResponse(False) 

38 if NPM.is_connected(): 

39 response.success = True 

40 else: 

41 response.success = False 

42 self.is_connected = True 

43 return response 

44 

45 def connect(self) -> NPM: 

46 """Make connection object""" 

47 self.connection = NPM 

48 return self.connection 

49 

50 def native_query(self, query: str) -> StatusResponse: 

51 """Receive and process a raw query""" 

52 ast = parse_sql(query) 

53 return self.query(ast)