Coverage for mindsdb / integrations / handlers / dockerhub_handler / dockerhub.py: 0%
44 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import requests
5class DockerHubClient:
6 def __init__(self):
7 self.auth_token = None
8 self.docker_hub_base_endpoint = "https://hub.docker.com/v2/"
10 def make_request(self, url, method='GET', data=None):
11 if method not in ['GET', 'POST']:
12 raise ValueError('Invalid HTTP request method')
13 headers = {'Content-type': 'application/json'}
14 if self.auth_token:
15 headers['Authorization'] = 'JWT ' + self.auth_token
16 request_method = getattr(requests, method.lower())
17 if data and len(data) > 0:
18 data = json.dumps(data)
19 resp = request_method(url, data, headers=headers)
20 else:
21 resp = request_method(url, headers=headers)
22 content = {}
23 if resp.status_code == 200:
24 content = {'content': json.loads(resp.content.decode()), 'code': 200}
25 else:
26 content = {'content': {}, 'code': resp.status_code, 'error': resp.text}
27 return content
29 def login(self, username=None, password=None):
30 data = {'username': username, 'password': password}
31 self.auth_token = None
32 resp = self.make_request(self.docker_hub_base_endpoint + 'users/login/', 'POST', data)
33 if resp['code'] == 200:
34 self.auth_token = resp['content']['token']
35 return resp
37 def get_images_summary(self, namespace, repo):
38 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/images-summary'
39 return self.make_request(url)
41 def get_repo_images(self, namespace, repo):
42 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/images'
43 return self.make_request(url)
45 def get_repo_tag(self, namespace, repo, tag):
46 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/tags/{tag}'
47 return self.make_request(url)
49 def get_repo_tags(self, namespace, repo):
50 url = f'{self.docker_hub_base_endpoint}namespaces/{namespace}/repositories/{repo}/tags'
51 return self.make_request(url)
53 def get_org_settings(self, namespace):
54 url = f'{self.docker_hub_base_endpoint}orgs/{namespace}/settings'
55 return self.make_request(url)