Coverage for mindsdb / integrations / handlers / plaid_handler / plaid_tables.py: 0%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from mindsdb.integrations.libs.api_handler import APITable 

3from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

4from mindsdb_sql_parser import ast 

5 

6 

7class BalanceTable(APITable): 

8 '''A class representing the balance table. 

9 

10 This class inherits from APITable and provides functionality to select data 

11 from the balance endpoint of the Plaid API and return it as a pandas DataFrame. 

12 

13 Methods: 

14 select(ast.Select): Select data from the balance table and return it as a pandas DataFrame. 

15 get_columns(): Get the list of column names for the balance table. 

16 

17 ''' 

18 

19 def select(self, query: ast.Select): 

20 '''Select data from the balance table and return it as a pandas DataFrame. 

21 

22 Args: 

23 query (ast.Select): The SQL query to be executed. 

24 

25 Returns: 

26 pandas.DataFrame: A pandas DataFrame containing the selected data. 

27 ''' 

28 

29 conditions = extract_comparison_conditions(query.where) 

30 params = {} 

31 for i in conditions: 

32 if i[1] == 'last_updated_datetime': 

33 if i[0] == '=': 

34 params[i[1]] = i[2] 

35 else: 

36 raise Exception("Only equals to '=' is Supported with 'last_updated_datetime'") 

37 

38 result = self.handler.call_plaid_api(method_name='get_balance', params=params) 

39 

40 self.filter_columns(query=query, result=result) 

41 return result 

42 

43 def get_columns(self): 

44 '''Get the list of column names for the balance table. 

45 

46 Returns: 

47 list: A list of column names for the balance table. 

48 

49 ''' 

50 return [ 

51 'account_id', 

52 'account_name', 

53 'account_mask', 

54 'account_type', 

55 'account_subtype', 

56 'account_official_name', 

57 'balance_iso_currency_code', 

58 'balance_unofficial_currency_code', 

59 'balance_available', 

60 'balance_current', 

61 'balance_limit' 

62 ] 

63 

64 def filter_columns(self, result: pd.DataFrame, query: ast.Select = None): 

65 

66 columns = [] 

67 if query is not None: 

68 for target in query.targets: 

69 if isinstance(target, ast.Star): 

70 columns = self.get_columns() 

71 break 

72 elif isinstance(target, ast.Identifier): 

73 columns.append(target.parts[-1]) 

74 else: 

75 raise NotImplementedError 

76 else: 

77 columns = self.get_columns() 

78 

79 columns = [name.lower() for name in columns] 

80 

81 if len(result) == 0: 

82 result = pd.DataFrame([], columns=columns) 

83 else: 

84 for col in set(columns) & set(result.columns) ^ set(columns): 

85 result[col] = None 

86 result = result[columns] 

87 

88 if query is not None and query.limit is not None: 

89 return result.head(query.limit.value) 

90 

91 return result 

92 

93 

94class TransactionTable(BalanceTable): 

95 '''A class representing the transaction table. 

96 

97 This class inherits from APITable and provides functionality to select data 

98 from the transactions endpoint of the Plaid API and return it as a pandas DataFrame. 

99 

100 Methods: 

101 select(ast.Select): Select data from the transaction table and return it as a pandas DataFrame. 

102 get_columns(): Get the list of column names for the transaction table. 

103 

104 ''' 

105 

106 def select(self, query: ast.Select): 

107 '''Select data from the transaction table and return it as a pandas DataFrame. 

108 

109 Args: 

110 query (ast.Select): The SQL query to be executed. 

111 

112 Returns: 

113 pandas.DataFrame: A pandas DataFrame containing the selected data. 

114 ''' 

115 all_conditions = extract_comparison_conditions(query.where) 

116 condition = [] 

117 params = {} 

118 for op, v, c in all_conditions: 

119 

120 op = '==' if op == '=' else op # converting '=' to '==' 

121 

122 if (v == 'start_date' or v == 'end_date'): 

123 params[v] = c 

124 

125 elif v in ['date', 'authorized_date'] or isinstance(c, str): 

126 condition.append(f"({v}{op}'{c}')") 

127 

128 else: 

129 condition.append(f"({v}{op}{c})") 

130 

131 merge_condition = ' and '.join(condition) 

132 

133 result = self.handler.call_plaid_api(method_name='get_transactions', params=params) 

134 if merge_condition != '': 

135 result = result.query(merge_condition) 

136 

137 result = self.filter_columns(query=query, result=result) 

138 return result 

139 

140 def get_columns(self): 

141 '''Get the list of column names for the transaction table. 

142 

143 Returns: 

144 list: A list of column names for the transaction table. 

145 ''' 

146 return [ 

147 'account_id', 

148 'transaction_id', 

149 'amount', 

150 'iso_currency_code', 

151 'check_number', 

152 'date', 

153 'authorized_date', 

154 'merchant_name', 

155 'payment_channel', 

156 'pending', 

157 ]