Coverage for mindsdb / integrations / handlers / binance_handler / binance_tables.py: 0%
92 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.integrations.libs.api_handler import APITable
2from mindsdb.integrations.utilities.date_utils import interval_str_to_duration_ms, utc_date_str_to_timestamp_ms
3from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
4from mindsdb_sql_parser import ast
6from concurrent.futures import ThreadPoolExecutor
7from typing import Dict, List
9import pandas as pd
10import time
13class BinanceAggregatedTradesTable(APITable):
15 # Default 1m intervals in aggregate data.
16 DEFAULT_AGGREGATE_TRADE_INTERVAL = '1m'
17 DEFAULT_AGGREGATE_TRADE_LIMIT = 1000
18 # Binance Spot client has connection pool size of 10.
19 MAX_THREAD_POOL_WORKERS = 10
21 def _get_batch_klines(self, executor: ThreadPoolExecutor, total_results: int, params: Dict) -> pd.DataFrame:
22 """Gets aggregate trade data in batches and combines the results together.
24 Returns all results as a pandas DataFrame.
26 Args:
27 executor (ThreadPoolExecutor): Executor to use when mapping API calls as tasks.
28 total_results (int): Total number of results to fetch.
29 params (Dict): Overall request params to be split into batches.
30 """
31 interval_duration_ms = interval_str_to_duration_ms(params['interval'])
32 if 'end_time' not in params:
33 # Default to get all klines before the current time.
34 overall_end_ms = int(time.time() * 1000)
35 else:
36 overall_end_ms = params['end_time']
38 if 'start_time' not in params:
39 total_duration_ms = interval_duration_ms * total_results
40 # Infer start time based on the interval and how many klines we need to fetch.
41 overall_start_ms = overall_end_ms - total_duration_ms
42 else:
43 overall_start_ms = params['start_time']
45 next_params = params.copy()
46 next_params['start_time'] = overall_start_ms
47 duration_per_api_call_ms = interval_duration_ms * BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT
48 next_params['end_time'] = min(overall_end_ms, overall_start_ms + duration_per_api_call_ms)
49 all_params = [next_params.copy()]
50 results_so_far = BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT
51 while next_params['end_time'] < overall_end_ms and results_so_far < total_results:
52 next_params['limit'] = min(
53 BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT,
54 total_results - results_so_far
55 )
56 next_params['start_time'] = next_params['end_time']
57 next_params['end_time'] = min(overall_end_ms, next_params['start_time'] + duration_per_api_call_ms)
58 all_params.append(next_params.copy())
59 results_so_far += next_params['limit']
61 aggregated_trade_subdatas = list(executor.map(lambda p: self.handler.call_binance_api(method_name='klines', params=p), all_params))
62 if not aggregated_trade_subdatas:
63 return pd.DataFrame([])
65 aggregated_trade_data = aggregated_trade_subdatas[0]
66 for aggregated_trade_subdata in aggregated_trade_subdatas[1:]:
67 aggregated_trade_data = pd.concat([aggregated_trade_data, aggregated_trade_subdata])
68 return aggregated_trade_data
70 def _get_kline_params_from_conditions(self, conditions: List) -> Dict:
71 """Gets aggregate trade data API params from SQL WHERE conditions.
73 Returns params to use for Binance API call to klines.
75 Args:
76 conditions (List): List of individual SQL WHERE conditions.
77 """
78 params = {
79 'interval': BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_INTERVAL,
80 'limit': BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT
81 }
82 for op, arg1, arg2 in conditions:
83 if arg1 == 'interval':
84 if op != '=':
85 raise NotImplementedError
86 params['interval'] = arg2
88 elif arg1 == 'symbol':
89 if op != '=':
90 raise NotImplementedError
91 params['symbol'] = arg2
92 interval_duration_ms = interval_str_to_duration_ms(params['interval'])
94 for op, arg1, arg2 in conditions:
95 if arg1 == 'open_time':
96 utc_timestamp_ms = utc_date_str_to_timestamp_ms(arg2)
97 if op == '>':
98 params['start_time'] = utc_timestamp_ms
99 else:
100 raise NotImplementedError
101 continue
102 elif arg1 == 'close_time':
103 utc_timestamp_ms = utc_date_str_to_timestamp_ms(arg2)
104 if op == '<':
105 params['end_time'] = utc_timestamp_ms - interval_duration_ms
106 else:
107 raise NotImplementedError
109 return params
111 def select(self, query: ast.Select) -> pd.DataFrame:
112 """Selects data from the Binance API and returns it as a pandas DataFrame.
114 Returns dataframe representing the Binance API results.
116 Args:
117 query (ast.Select): Given SQL SELECT query
118 """
119 conditions = extract_comparison_conditions(query.where)
120 params = self._get_kline_params_from_conditions(conditions)
122 total_results = params['limit']
123 if query.limit:
124 total_results = query.limit.value
125 params['limit'] = min(BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT, query.limit.value)
127 if total_results > BinanceAggregatedTradesTable.DEFAULT_AGGREGATE_TRADE_LIMIT:
128 # Max 1000 klines per API call so we need to combine multiple API calls.
129 with ThreadPoolExecutor(max_workers=BinanceAggregatedTradesTable.MAX_THREAD_POOL_WORKERS) as executor:
130 aggregated_trades_data = self._get_batch_klines(executor, total_results, params)
132 else:
133 aggregated_trades_data = self.handler.call_binance_api(
134 method_name='klines',
135 params=params
136 )
138 # Only return the columns we need to.
139 columns = []
140 for target in query.targets:
141 if isinstance(target, ast.Star):
142 columns = self.get_columns()
143 break
144 elif isinstance(target, ast.Identifier):
145 columns.append(target.parts[-1])
146 else:
147 raise NotImplementedError
149 if len(aggregated_trades_data) == 0:
150 aggregated_trades_data = pd.DataFrame([], columns=columns)
151 else:
152 # Remove columns not part of select.
153 aggregated_trades_data.columns = self.get_columns()
154 for col in set(aggregated_trades_data.columns).difference(set(columns)):
155 aggregated_trades_data = aggregated_trades_data.drop(col, axis=1)
157 return aggregated_trades_data
159 def get_columns(self):
160 """Gets all columns to be returned in pandas DataFrame responses"""
161 return [
162 'symbol',
163 'open_time',
164 'open_price',
165 'high_price',
166 'low_price',
167 'close_price',
168 'volume',
169 'close_time',
170 'quote_asset_volume',
171 'number_of_trades',
172 'taker_buy_base_asset_volume',
173 'taker_buy_quote_asset_volume'
174 ]