Coverage for mindsdb / integrations / handlers / salesforce_handler / salesforce_tables.py: 85%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Dict, List, Text 

2 

3from mindsdb_sql_parser.ast import Select, Star, Identifier 

4import pandas as pd 

5from salesforce_api.exceptions import RestRequestCouldNotBeUnderstoodError 

6 

7from mindsdb.integrations.libs.api_handler import MetaAPIResource 

8from mindsdb.integrations.utilities.sql_utils import FilterCondition, FilterOperator 

9from mindsdb.utilities import log 

10 

11 

12logger = log.getLogger(__name__) 

13 

14 

15def create_table_class(resource_name: Text) -> MetaAPIResource: 

16 """ 

17 Creates a table class for the given Salesforce resource. 

18 """ 

19 

20 class AnyTable(MetaAPIResource): 

21 """ 

22 This is the table abstraction for any resource of the Salesforce API. 

23 """ 

24 

25 def __init__(self, *args, table_name=None, **kwargs): 

26 """ 

27 Initializes the AnyTable class. 

28 

29 Args: 

30 *args: Variable length argument list. 

31 table_name (str): The name of the table that represents the Salesforce resource. 

32 **kwargs: Arbitrary keyword arguments. 

33 """ 

34 super().__init__(*args, table_name=table_name, **kwargs) 

35 self.resource_metadata = None 

36 

37 def select(self, query: Select) -> pd.DataFrame: 

38 """ 

39 Executes a SELECT SQL query represented by an ASTNode object on the Salesforce resource and retrieves the data (if any). 

40 

41 Args: 

42 query (ASTNode): An ASTNode object representing the SQL query to be executed. 

43 

44 Returns: 

45 pd.DataFrame: A DataFrame containing the data retrieved from the Salesforce resource. 

46 """ 

47 query.from_table = resource_name 

48 

49 # SOQL does not support * in SELECT queries. Replace * with column names. 

50 if isinstance(query.targets[0], Star): 

51 query.targets = [Identifier(column) for column in self.get_columns()] 

52 

53 # SOQL does not support column aliases. Remove column aliases. 

54 column_aliases = {} 

55 for column in query.targets: 

56 if column.alias is not None: 

57 column_aliases[column.parts[-1]] = column.alias.parts[-1] 

58 column.alias = None 

59 

60 client = self.handler.connect() 

61 

62 query_str = query.to_string() 

63 

64 # SOQL does not support backticks. Remove backticks. 

65 query_str = query_str.replace("`", "") 

66 results = client.sobjects.query(query_str) 

67 

68 for result in results: 

69 del result["attributes"] 

70 

71 df = pd.DataFrame(results) 

72 df.rename(columns=column_aliases, inplace=True) 

73 

74 return df 

75 

76 def add(self, item: Dict) -> None: 

77 """ 

78 Adds a new item to the Salesforce resource. 

79 

80 Args: 

81 contact (Dict): The data to be inserted into the Salesforce resource. 

82 """ 

83 client = self.handler.connect() 

84 getattr(client.sobjects, resource_name).insert(item) 

85 

86 def modify(self, conditions: List[FilterCondition], values: Dict) -> None: 

87 """ 

88 Modifies items in the Salesforce resource based on the specified conditions. 

89 

90 Args: 

91 conditions (List[FilterCondition]): The conditions based on which the items are to be modified. 

92 values (Dict): The values to be updated in the items. 

93 """ 

94 client = self.handler.connect() 

95 

96 ids = self._validate_conditions(conditions) 

97 

98 for id in ids: 

99 getattr(client.sobjects, resource_name).update(id, values) 

100 

101 def remove(self, conditions: List[FilterCondition]) -> None: 

102 """ 

103 Removes items from the Salesforce resource based on the specified conditions. 

104 

105 Args: 

106 conditions (List[FilterCondition]): The conditions based on which the items are to be removed. 

107 """ 

108 client = self.handler.connect() 

109 

110 ids = self._validate_conditions(conditions) 

111 

112 for id in ids: 

113 getattr(client.sobjects, resource_name).delete(id) 

114 

115 def _validate_conditions(self, conditions: List[FilterCondition]) -> None: 

116 """ 

117 Validates the conditions used for filtering items in the Salesforce resource. 

118 

119 Args: 

120 conditions (List[FilterCondition]): The conditions to be validated. 

121 """ 

122 # Salesforce API does not support filtering items based on attributes other than 'Id'. Raise an error if any other column is used. 

123 if len(conditions) != 1 or conditions[0].column != "Id": 

124 raise ValueError("Only the 'Id' column can be used to filter items.") 

125 

126 # Only the 'equals' and 'in' operators can be used on the 'Id' column for deletion. Raise an error if any other operator is used. 

127 if conditions[0].op not in [FilterOperator.EQUAL, FilterOperator.IN]: 

128 raise ValueError("Only the 'equals' and 'in' operators can be used on the 'Id' column.") 

129 

130 return conditions[0].value if isinstance(conditions[0].value, list) else [conditions[0].value] 

131 

132 def _get_resource_metadata(self) -> Dict: 

133 """ 

134 Retrieves metadata about the Salesforce resource. 

135 

136 Returns: 

137 Dict: A dictionary containing metadata about the Salesforce resource. 

138 """ 

139 if self.resource_metadata: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return self.resource_metadata 

141 

142 client = self.handler.connect() 

143 return getattr(client.sobjects, resource_name).describe() 

144 

145 def get_columns(self) -> List[Text]: 

146 """ 

147 Retrieves the attributes (columns) of the Salesforce resource. 

148 

149 Returns: 

150 List[Text]: A list of Attributes (columns) of the Salesforce resource. 

151 """ 

152 return [field["name"] for field in self._get_resource_metadata()["fields"]] 

153 

154 def meta_get_tables(self, table_name: str, main_metadata: Dict) -> Dict: 

155 """ 

156 Retrieves table metadata for the Salesforce resource. 

157 

158 Args: 

159 table_name (str): The name given to the table that represents the Salesforce resource. 

160 main_metadata (Dict): The main metadata dictionary containing information about all Salesforce resources. 

161 

162 Returns: 

163 Dict: A dictionary containing table metadata for the Salesforce resource. 

164 """ 

165 client = self.handler.connect() 

166 

167 try: 

168 resource_metadata = next( 

169 (resource for resource in main_metadata if resource["name"].lower() == resource_name), 

170 ) 

171 except Exception as e: 

172 logger.warning(f"Failed to get resource metadata for {resource_name}: {e}") 

173 return { 

174 "table_name": table_name, 

175 "table_type": "BASE TABLE", 

176 "table_description": "", 

177 "row_count": None, 

178 } 

179 # Get row count if Id column is aggregatable. 

180 row_count = None 

181 # if next(field for field in resource_metadata['fields'] if field['name'] == 'Id').get('aggregatable', False): 

182 try: 

183 row_count = client.sobjects.query(f"SELECT COUNT(Id) FROM {resource_name}")[0]["expr0"] 

184 except RestRequestCouldNotBeUnderstoodError as request_error: 

185 logger.warning(f"Failed to get row count for {resource_name}: {request_error}") 

186 

187 return { 

188 "table_name": table_name, 

189 "table_type": "BASE TABLE", 

190 "table_description": resource_metadata.get("label", ""), 

191 "row_count": row_count, 

192 } 

193 

194 def meta_get_columns(self, table_name: str) -> List[Dict]: 

195 """ 

196 Retrieves column metadata for the Salesforce resource. 

197 

198 Args: 

199 table_name (str): The name given to the table that represents the Salesforce resource. 

200 

201 Returns: 

202 List[Dict]: A list of dictionaries containing column metadata for the Salesforce resource. 

203 """ 

204 resource_metadata = self._get_resource_metadata() 

205 

206 column_metadata = [] 

207 for field in resource_metadata["fields"]: 

208 column_metadata.append( 

209 { 

210 "table_name": table_name, 

211 "column_name": field["name"], 

212 "data_type": field["type"], 

213 "is_nullable": field.get("nillable", False), 

214 "default_value": field.get("defaultValue", ""), 

215 "description": field.get("inlineHelpText", ""), 

216 } 

217 ) 

218 

219 return column_metadata 

220 

221 def meta_get_primary_keys(self, table_name: str) -> List[Dict]: 

222 """ 

223 Retrieves the primary keys for the Salesforce resource. 

224 

225 Args: 

226 table_name (str): The name given to the table that represents the Salesforce resource. 

227 

228 Returns: 

229 List[Dict]: A list of dictionaries containing primary key metadata for the Salesforce resource. 

230 """ 

231 return [ 

232 { 

233 "table_name": table_name, 

234 "column_name": "Id", 

235 } 

236 ] 

237 

238 def meta_get_foreign_keys(self, table_name: str, all_tables: List[str]) -> List[Dict]: 

239 """ 

240 Retrieves the foreign keys for the Salesforce resource. 

241 

242 Args: 

243 table_name (str): The name given to the table that represents the Salesforce resource. 

244 all_tables (List[str]): A list of all table names in the Salesforce database. 

245 

246 Returns: 

247 List[Dict]: A list of dictionaries containing foreign key metadata for the Salesforce resource. 

248 """ 

249 resource_metadata = self._get_resource_metadata() 

250 

251 foreign_key_metadata = [] 

252 for child_relationship in resource_metadata.get("childRelationships", []): 

253 # Skip if the child relationship is not one of the supported tables. 

254 child_table_name = child_relationship["childSObject"] 

255 if child_table_name not in all_tables: 

256 continue 

257 

258 foreign_key_metadata.append( 

259 { 

260 "parent_table_name": table_name, 

261 "parent_column_name": "Id", 

262 "child_table_name": child_table_name, 

263 "child_column_name": child_relationship["field"], 

264 } 

265 ) 

266 

267 return foreign_key_metadata 

268 

269 return AnyTable