Coverage for mindsdb / integrations / handlers / binance_handler / binance_tables.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.integrations.libs.api_handler import APITable 

2from mindsdb.integrations.utilities.date_utils import interval_str_to_duration_ms, utc_date_str_to_timestamp_ms 

3from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

4from mindsdb_sql_parser import ast 

5 

6from concurrent.futures import ThreadPoolExecutor 

7from typing import Dict, List 

8 

9import pandas as pd 

10import time 

11 

12 

13class BinanceAggregatedTradesTable(APITable): 

14 

15 # Default 1m intervals in aggregate data. 

16 DEFAULT_AGGREGATE_TRADE_INTERVAL = '1m' 

17 DEFAULT_AGGREGATE_TRADE_LIMIT = 1000 

18 # Binance Spot client has connection pool size of 10. 

19 MAX_THREAD_POOL_WORKERS = 10 

20 

21 def _get_batch_klines(self, executor: ThreadPoolExecutor, total_results: int, params: Dict) -> pd.DataFrame: 

22 """Gets aggregate trade data in batches and combines the results together. 

23 

24 Returns all results as a pandas DataFrame. 

25 

26 Args: 

27 executor (ThreadPoolExecutor): Executor to use when mapping API calls as tasks. 

28 total_results (int): Total number of results to fetch. 

29 params (Dict): Overall request params to be split into batches. 

30 """ 

31 interval_duration_ms = interval_str_to_duration_ms(params['interval']) 

32 if 'end_time' not in params: 

33 # Default to get all klines before the current time. 

34 overall_end_ms = int(time.time() * 1000) 

35 else: 

36 overall_end_ms = params['end_time'] 

37 

38 if 'start_time' not in params: 

39 total_duration_ms = interval_duration_ms * total_results 

40 # Infer start time based on the interval and how many klines we need to fetch. 

41 overall_start_ms = overall_end_ms - total_duration_ms 

42 else: 

43 overall_start_ms = params['start_time'] 

44 

45 next_params = params.copy() 

46 next_params['start_time'] = overall_start_ms 

47 duration_per_api_call_ms = interval_duration_ms * BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT 

48 next_params['end_time'] = min(overall_end_ms, overall_start_ms + duration_per_api_call_ms) 

49 all_params = [next_params.copy()] 

50 results_so_far = BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT 

51 while next_params['end_time'] < overall_end_ms and results_so_far < total_results: 

52 next_params['limit'] = min( 

53 BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT, 

54 total_results - results_so_far 

55 ) 

56 next_params['start_time'] = next_params['end_time'] 

57 next_params['end_time'] = min(overall_end_ms, next_params['start_time'] + duration_per_api_call_ms) 

58 all_params.append(next_params.copy()) 

59 results_so_far += next_params['limit'] 

60 

61 aggregated_trade_subdatas = list(executor.map(lambda p: self.handler.call_binance_api(method_name='klines', params=p), all_params)) 

62 if not aggregated_trade_subdatas: 

63 return pd.DataFrame([]) 

64 

65 aggregated_trade_data = aggregated_trade_subdatas[0] 

66 for aggregated_trade_subdata in aggregated_trade_subdatas[1:]: 

67 aggregated_trade_data = pd.concat([aggregated_trade_data, aggregated_trade_subdata]) 

68 return aggregated_trade_data 

69 

70 def _get_kline_params_from_conditions(self, conditions: List) -> Dict: 

71 """Gets aggregate trade data API params from SQL WHERE conditions. 

72 

73 Returns params to use for Binance API call to klines. 

74 

75 Args: 

76 conditions (List): List of individual SQL WHERE conditions. 

77 """ 

78 params = { 

79 'interval': BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_INTERVAL, 

80 'limit': BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT 

81 } 

82 for op, arg1, arg2 in conditions: 

83 if arg1 == 'interval': 

84 if op != '=': 

85 raise NotImplementedError 

86 params['interval'] = arg2 

87 

88 elif arg1 == 'symbol': 

89 if op != '=': 

90 raise NotImplementedError 

91 params['symbol'] = arg2 

92 interval_duration_ms = interval_str_to_duration_ms(params['interval']) 

93 

94 for op, arg1, arg2 in conditions: 

95 if arg1 == 'open_time': 

96 utc_timestamp_ms = utc_date_str_to_timestamp_ms(arg2) 

97 if op == '>': 

98 params['start_time'] = utc_timestamp_ms 

99 else: 

100 raise NotImplementedError 

101 continue 

102 elif arg1 == 'close_time': 

103 utc_timestamp_ms = utc_date_str_to_timestamp_ms(arg2) 

104 if op == '<': 

105 params['end_time'] = utc_timestamp_ms - interval_duration_ms 

106 else: 

107 raise NotImplementedError 

108 

109 return params 

110 

111 def select(self, query: ast.Select) -> pd.DataFrame: 

112 """Selects data from the Binance API and returns it as a pandas DataFrame. 

113 

114 Returns dataframe representing the Binance API results. 

115 

116 Args: 

117 query (ast.Select): Given SQL SELECT query 

118 """ 

119 conditions = extract_comparison_conditions(query.where) 

120 params = self._get_kline_params_from_conditions(conditions) 

121 

122 total_results = params['limit'] 

123 if query.limit: 

124 total_results = query.limit.value 

125 params['limit'] = min(BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT, query.limit.value) 

126 

127 if total_results > BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT: 

128 # Max 1000 klines per API call so we need to combine multiple API calls. 

129 with ThreadPoolExecutor(max_workers=BinanceAggregatedTradesTable.MAX_THREAD_POOL_WORKERS) as executor: 

130 aggregated_trades_data = self._get_batch_klines(executor, total_results, params) 

131 

132 else: 

133 aggregated_trades_data = self.handler.call_binance_api( 

134 method_name='klines', 

135 params=params 

136 ) 

137 

138 # Only return the columns we need to. 

139 columns = [] 

140 for target in query.targets: 

141 if isinstance(target, ast.Star): 

142 columns = self.get_columns() 

143 break 

144 elif isinstance(target, ast.Identifier): 

145 columns.append(target.parts[-1]) 

146 else: 

147 raise NotImplementedError 

148 

149 if len(aggregated_trades_data) == 0: 

150 aggregated_trades_data = pd.DataFrame([], columns=columns) 

151 else: 

152 # Remove columns not part of select. 

153 aggregated_trades_data.columns = self.get_columns() 

154 for col in set(aggregated_trades_data.columns).difference(set(columns)): 

155 aggregated_trades_data = aggregated_trades_data.drop(col, axis=1) 

156 

157 return aggregated_trades_data 

158 

159 def get_columns(self): 

160 """Gets all columns to be returned in pandas DataFrame responses""" 

161 return [ 

162 'symbol', 

163 'open_time', 

164 'open_price', 

165 'high_price', 

166 'low_price', 

167 'close_price', 

168 'volume', 

169 'close_time', 

170 'quote_asset_volume', 

171 'number_of_trades', 

172 'taker_buy_base_asset_volume', 

173 'taker_buy_quote_asset_volume' 

174 ]