Coverage for mindsdb / integrations / handlers / dockerhub_handler / dockerhub.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import requests 

3 

4 

5class DockerHubClient: 

6 def __init__(self): 

7 self.auth_token = None 

8 self.docker_hub_base_endpoint = "https://hub.docker.com/v2/" 

9 

10 def make_request(self, url, method='GET', data=None): 

11 if method not in ['GET', 'POST']: 

12 raise ValueError('Invalid HTTP request method') 

13 headers = {'Content-type': 'application/json'} 

14 if self.auth_token: 

15 headers['Authorization'] = 'JWT ' + self.auth_token 

16 request_method = getattr(requests, method.lower()) 

17 if data and len(data) > 0: 

18 data = json.dumps(data) 

19 resp = request_method(url, data, headers=headers) 

20 else: 

21 resp = request_method(url, headers=headers) 

22 content = {} 

23 if resp.status_code == 200: 

24 content = {'content': json.loads(resp.content.decode()), 'code': 200} 

25 else: 

26 content = {'content': {}, 'code': resp.status_code, 'error': resp.text} 

27 return content 

28 

29 def login(self, username=None, password=None): 

30 data = {'username': username, 'password': password} 

31 self.auth_token = None 

32 resp = self.make_request(self.docker_hub_base_endpoint + 'users/login/', 'POST', data) 

33 if resp['code'] == 200: 

34 self.auth_token = resp['content']['token'] 

35 return resp 

36 

37 def get_images_summary(self, namespace, repo): 

38 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/images-summary' 

39 return self.make_request(url) 

40 

41 def get_repo_images(self, namespace, repo): 

42 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/images' 

43 return self.make_request(url) 

44 

45 def get_repo_tag(self, namespace, repo, tag): 

46 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/tags/{tag}' 

47 return self.make_request(url) 

48 

49 def get_repo_tags(self, namespace, repo): 

50 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/tags' 

51 return self.make_request(url) 

52 

53 def get_org_settings(self, namespace): 

54 url = f'{self.docker_hub_base_endpoint}orgs/{namespace}/settings' 

55 return self.make_request(url)