Coverage for mindsdb / integrations / handlers / coinbase_handler / coinbase_handler.py: 0%
73 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import hmac
3import base64
4import hashlib
5import datetime
6from typing import Dict
8import pandas as pd
9import requests
11from mindsdb.integrations.handlers.coinbase_handler.coinbase_tables import CoinBaseAggregatedTradesTable
12from mindsdb.integrations.libs.api_handler import APIHandler
13from mindsdb.integrations.libs.response import (
14 HandlerStatusResponse as StatusResponse,
15 HandlerResponse as Response,
16)
18from mindsdb_sql_parser import parse_sql
20_BASE_COINBASE_US_URL = 'https://api.exchange.coinbase.com'
23class CoinBaseHandler(APIHandler):
24 """A class for handling connections and interactions with the CoinBase API.
26 Attributes:
27 api_key (str): API key
28 api_secret (str): API secret
29 is_connected (bool): Whether or not the API client is connected to CoinBase.
31 """
33 def __init__(self, name: str = None, **kwargs):
34 """Registers all API tables and prepares the handler for an API connection.
36 Args:
37 name: (str): The handler name to use
38 """
39 super().__init__(name)
40 self.api_key = None
41 self.api_secret = None
42 self.api_passphrase = None
44 args = kwargs.get('connection_data', {})
45 if 'api_key' in args:
46 self.api_key = args['api_key']
47 if 'api_secret' in args:
48 self.api_secret = args['api_secret']
49 if 'api_passphrase' in args:
50 self.api_passphrase = args['api_passphrase']
51 self.client = None
52 self.is_connected = False
54 coinbase_candle_data = CoinBaseAggregatedTradesTable(self)
55 self._register_table('coinbase_candle_data', coinbase_candle_data)
57 def connect(self):
58 """Creates a new CoinBase API client if needed and sets it as the client to use for requests.
60 Returns newly created CoinBase API client, or current client if already set.
61 """
62 self.is_connected = True
63 return self.client
65 def check_connection(self) -> StatusResponse:
66 """Checks connection to CoinBase API by sending a ping request.
68 Returns StatusResponse indicating whether or not the handler is connected.
69 """
70 response = StatusResponse(True)
71 self.is_connected = response.success
72 return response
74 # symbol: BTC-USD
75 # granularity 60, 300, 900, 3600, 21600, 86400
76 def get_coinbase_candle(self, symbol: str, granularity: int) -> pd.DataFrame:
77 jdocs = []
78 current_time = datetime.datetime.now()
79 start_time = current_time - datetime.timedelta(seconds=granularity)
80 start_time_iso = start_time.isoformat().split(".")[0] + "-04:00"
81 path = "/products/" + symbol + "/candles?granularity=" + str(granularity) + "&start=" + start_time_iso
82 headers = self.generate_api_headers("GET", path)
83 url = _BASE_COINBASE_US_URL + path
84 response = requests.get(url, headers=headers)
85 candles = response.json()
86 for candle in candles:
87 dt = datetime.datetime.fromtimestamp(candle[0], None).isoformat()
88 low, high, open, close, volume = candle[1:]
89 jdoc = {"symbol": symbol, "low": low, "high": high, "open": open, "close": close, "volume": volume, "timestamp": candle[0], "timestamp_iso": dt}
90 jdocs.append(jdoc)
91 return pd.DataFrame(jdocs)
93 def _get_candle(self, params: Dict = None) -> pd.DataFrame:
94 """Gets aggregate trade data for a symbol based on given parameters
96 Returns results as a pandas DataFrame.
98 Args:
99 params (Dict): Trade data params (symbol, interval)
100 """
101 if 'symbol' not in params:
102 raise ValueError('Missing "symbol" param to fetch trade data for.')
103 if 'interval' not in params:
104 raise ValueError('Missing "interval" param (60, 300, 900, 3600, 21600, 86400).')
106 candle = self.get_coinbase_candle(params['symbol'], int(params['interval']))
107 return candle
109 def native_query(self, query: str = None) -> Response:
110 ast = parse_sql(query)
111 return self.query(ast)
113 def generate_api_headers(self, method: str, path: str) -> dict:
114 timestamp = str(int(time.time()))
115 message = timestamp + method + path
116 signature = base64.b64encode(hmac.new(base64.b64decode(self.api_secret), str.encode(message), hashlib.sha256).digest())
117 headers = {
118 "Content-Type": "application/json",
119 "CB-ACCESS-SIGN": signature,
120 "CB-ACCESS-KEY": self.api_key,
121 "CB-ACCESS-TIMESTAMP": timestamp,
122 "CB-VERSION": "2015-04-08",
123 "CB-ACCESS-PASSPHRASE": self.api_passphrase
124 }
125 return headers
127 def call_coinbase_api(self, method_name: str = None, params: Dict = None) -> pd.DataFrame:
128 """Calls the CoinBase API method with the given params.
130 Returns results as a pandas DataFrame.
132 Args:
133 method_name (str): Method name to call
134 params (Dict): Params to pass to the API call
135 """
136 if method_name == 'get_candle':
137 return self._get_candle(params)
138 raise NotImplementedError('Method name {} not supported by CoinBase API Handler'.format(method_name))