Coverage for mindsdb / integrations / handlers / binance_handler / binance_handler.py: 0%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from typing import Dict 

3 

4from binance.spot import Spot 

5 

6from mindsdb.integrations.handlers.binance_handler.binance_tables import BinanceAggregatedTradesTable 

7from mindsdb.integrations.libs.api_handler import APIHandler 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11) 

12from mindsdb.utilities import log 

13from mindsdb_sql_parser import parse_sql 

14 

15_BASE_BINANCE_US_URL = 'https://api.binance.us' 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class BinanceHandler(APIHandler): 

21 """A class for handling connections and interactions with the Binance API. 

22 

23 Attributes: 

24 client (binance.spot.Spot): The `binance.spot.Spot` object for interacting with the Binance API. 

25 api_key (str): API key, only required for user data endpoints. 

26 api_secret (str): API secret, only required for user data endpoints. 

27 is_connected (bool): Whether or not the API client is connected to Binance. 

28 

29 """ 

30 

31 def __init__(self, name: str = None, **kwargs): 

32 """Registers all API tables and prepares the handler for an API connection. 

33 

34 Args: 

35 name: (str): The handler name to use 

36 """ 

37 super().__init__(name) 

38 self.api_key = None 

39 self.api_secret = None 

40 

41 args = kwargs.get('connection_data', {}) 

42 if 'api_key' in args: 

43 self.api_key = args['api_key'] 

44 if 'api_secret' in args: 

45 self.api_secret = args['api_secret'] 

46 

47 self.client = None 

48 self.is_connected = False 

49 

50 aggregated_trade_data = BinanceAggregatedTradesTable(self) 

51 self._register_table('aggregated_trade_data', aggregated_trade_data) 

52 

53 def connect(self) -> Spot: 

54 """Creates a new Binance Spot API client if needed and sets it as the client to use for requests. 

55 

56 Returns newly created Binance Spot API client, or current client if already set. 

57 """ 

58 if self.is_connected is True and self.client: 

59 return self.client 

60 

61 if self.api_key and self.api_secret: 

62 self.client = Spot(key=self.api_key, secret=self.api_secret, base_url=_BASE_BINANCE_US_URL) 

63 else: 

64 self.client = Spot(base_url=_BASE_BINANCE_US_URL) 

65 

66 self.is_connected = True 

67 return self.client 

68 

69 def check_connection(self) -> StatusResponse: 

70 """Checks connection to Binance API by sending a ping request. 

71 

72 Returns StatusResponse indicating whether or not the handler is connected. 

73 """ 

74 

75 response = StatusResponse(False) 

76 

77 try: 

78 client = self.connect() 

79 client.ping() 

80 response.success = True 

81 

82 except Exception as e: 

83 logger.error(f'Error connecting to Binance API: {e}!') 

84 response.error_message = e 

85 

86 self.is_connected = response.success 

87 return response 

88 

89 def _get_klines(self, params: Dict = None) -> pd.DataFrame: 

90 """Gets aggregate trade data for a symbol based on given parameters 

91 

92 Returns results as a pandas DataFrame. 

93 

94 Args: 

95 params (Dict): Trade data params (symbol, interval, limit, start_time, end_time) 

96 """ 

97 if 'symbol' not in params: 

98 raise ValueError('Missing "symbol" param to fetch trade data for.') 

99 if 'interval' not in params: 

100 raise ValueError('Missing "interval" param (1m, 1d, etc).') 

101 

102 client = self.connect() 

103 symbol = params['symbol'] 

104 interval = params['interval'] 

105 limit = params['limit'] if 'limit' in params else BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT 

106 start_time = params['start_time'] if 'start_time' in params else None 

107 end_time = params['end_time'] if 'end_time' in params else None 

108 raw_klines = client.klines( 

109 symbol, 

110 interval, 

111 limit=limit, 

112 startTime=start_time, 

113 endTime=end_time) 

114 

115 open_time_i = 0 

116 close_time_i = 6 

117 for i in range(len(raw_klines)): 

118 # To train we need timestamps to be in seconds since Unix epoch and Binance returns it in ms. 

119 raw_klines[i][open_time_i] = int(raw_klines[i][open_time_i] / 1000) 

120 raw_klines[i][close_time_i] = int(raw_klines[i][close_time_i] / 1000) 

121 

122 df = pd.DataFrame(raw_klines) 

123 df.insert(0, 'symbol', [symbol] * len(raw_klines), True) 

124 # Remove last unnecessary column (unused API field) 

125 if len(raw_klines) > 0: 

126 num_cols = len(df.columns) 

127 df = df.drop(df.columns[[num_cols - 1]], axis=1) 

128 return df 

129 

130 def native_query(self, query: str = None) -> Response: 

131 ast = parse_sql(query) 

132 return self.query(ast) 

133 

134 def call_binance_api(self, method_name: str = None, params: Dict = None) -> pd.DataFrame: 

135 """Calls the Binance API method with the given params. 

136 

137 Returns results as a pandas DataFrame. 

138 

139 Args: 

140 method_name (str): Method name to call (e.g. klines) 

141 params (Dict): Params to pass to the API call 

142 """ 

143 if method_name == 'klines': 

144 return self._get_klines(params) 

145 raise NotImplementedError('Method name {} not supported by Binance API Handler'.format(method_name))