Coverage for mindsdb / integrations / handlers / coinbase_handler / coinbase_handler.py: 0%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import hmac 

3import base64 

4import hashlib 

5import datetime 

6from typing import Dict 

7 

8import pandas as pd 

9import requests 

10 

11from mindsdb.integrations.handlers.coinbase_handler.coinbase_tables import CoinBaseAggregatedTradesTable 

12from mindsdb.integrations.libs.api_handler import APIHandler 

13from mindsdb.integrations.libs.response import ( 

14 HandlerStatusResponse as StatusResponse, 

15 HandlerResponse as Response, 

16) 

17 

18from mindsdb_sql_parser import parse_sql 

19 

20_BASE_COINBASE_US_URL = 'https://api.exchange.coinbase.com' 

21 

22 

23class CoinBaseHandler(APIHandler): 

24 """A class for handling connections and interactions with the CoinBase API. 

25 

26 Attributes: 

27 api_key (str): API key 

28 api_secret (str): API secret 

29 is_connected (bool): Whether or not the API client is connected to CoinBase. 

30 

31 """ 

32 

33 def __init__(self, name: str = None, **kwargs): 

34 """Registers all API tables and prepares the handler for an API connection. 

35 

36 Args: 

37 name: (str): The handler name to use 

38 """ 

39 super().__init__(name) 

40 self.api_key = None 

41 self.api_secret = None 

42 self.api_passphrase = None 

43 

44 args = kwargs.get('connection_data', {}) 

45 if 'api_key' in args: 

46 self.api_key = args['api_key'] 

47 if 'api_secret' in args: 

48 self.api_secret = args['api_secret'] 

49 if 'api_passphrase' in args: 

50 self.api_passphrase = args['api_passphrase'] 

51 self.client = None 

52 self.is_connected = False 

53 

54 coinbase_candle_data = CoinBaseAggregatedTradesTable(self) 

55 self._register_table('coinbase_candle_data', coinbase_candle_data) 

56 

57 def connect(self): 

58 """Creates a new CoinBase API client if needed and sets it as the client to use for requests. 

59 

60 Returns newly created CoinBase API client, or current client if already set. 

61 """ 

62 self.is_connected = True 

63 return self.client 

64 

65 def check_connection(self) -> StatusResponse: 

66 """Checks connection to CoinBase API by sending a ping request. 

67 

68 Returns StatusResponse indicating whether or not the handler is connected. 

69 """ 

70 response = StatusResponse(True) 

71 self.is_connected = response.success 

72 return response 

73 

74 # symbol: BTC-USD 

75 # granularity 60, 300, 900, 3600, 21600, 86400 

76 def get_coinbase_candle(self, symbol: str, granularity: int) -> pd.DataFrame: 

77 jdocs = [] 

78 current_time = datetime.datetime.now() 

79 start_time = current_time - datetime.timedelta(seconds=granularity) 

80 start_time_iso = start_time.isoformat().split(".")[0] + "-04:00" 

81 path = "/products/" + symbol + "/candles?granularity=" + str(granularity) + "&start=" + start_time_iso 

82 headers = self.generate_api_headers("GET", path) 

83 url = _BASE_COINBASE_US_URL + path 

84 response = requests.get(url, headers=headers) 

85 candles = response.json() 

86 for candle in candles: 

87 dt = datetime.datetime.fromtimestamp(candle[0], None).isoformat() 

88 low, high, open, close, volume = candle[1:] 

89 jdoc = {"symbol": symbol, "low": low, "high": high, "open": open, "close": close, "volume": volume, "timestamp": candle[0], "timestamp_iso": dt} 

90 jdocs.append(jdoc) 

91 return pd.DataFrame(jdocs) 

92 

93 def _get_candle(self, params: Dict = None) -> pd.DataFrame: 

94 """Gets aggregate trade data for a symbol based on given parameters 

95 

96 Returns results as a pandas DataFrame. 

97 

98 Args: 

99 params (Dict): Trade data params (symbol, interval) 

100 """ 

101 if 'symbol' not in params: 

102 raise ValueError('Missing "symbol" param to fetch trade data for.') 

103 if 'interval' not in params: 

104 raise ValueError('Missing "interval" param (60, 300, 900, 3600, 21600, 86400).') 

105 

106 candle = self.get_coinbase_candle(params['symbol'], int(params['interval'])) 

107 return candle 

108 

109 def native_query(self, query: str = None) -> Response: 

110 ast = parse_sql(query) 

111 return self.query(ast) 

112 

113 def generate_api_headers(self, method: str, path: str) -> dict: 

114 timestamp = str(int(time.time())) 

115 message = timestamp + method + path 

116 signature = base64.b64encode(hmac.new(base64.b64decode(self.api_secret), str.encode(message), hashlib.sha256).digest()) 

117 headers = { 

118 "Content-Type": "application/json", 

119 "CB-ACCESS-SIGN": signature, 

120 "CB-ACCESS-KEY": self.api_key, 

121 "CB-ACCESS-TIMESTAMP": timestamp, 

122 "CB-VERSION": "2015-04-08", 

123 "CB-ACCESS-PASSPHRASE": self.api_passphrase 

124 } 

125 return headers 

126 

127 def call_coinbase_api(self, method_name: str = None, params: Dict = None) -> pd.DataFrame: 

128 """Calls the CoinBase API method with the given params. 

129 

130 Returns results as a pandas DataFrame. 

131 

132 Args: 

133 method_name (str): Method name to call 

134 params (Dict): Params to pass to the API call 

135 """ 

136 if method_name == 'get_candle': 

137 return self._get_candle(params) 

138 raise NotImplementedError('Method name {} not supported by CoinBase API Handler'.format(method_name))