Coverage for mindsdb / integrations / handlers / salesforce_handler / salesforce_tables.py: 85%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Dict, List, Text
3from mindsdb_sql_parser.ast import Select, Star, Identifier
4import pandas as pd
5from salesforce_api.exceptions import RestRequestCouldNotBeUnderstoodError
7from mindsdb.integrations.libs.api_handler import MetaAPIResource
8from mindsdb.integrations.utilities.sql_utils import FilterCondition, FilterOperator
9from mindsdb.utilities import log
12logger = log.getLogger(__name__)
15def create_table_class(resource_name: Text) -> MetaAPIResource:
16 """
17 Creates a table class for the given Salesforce resource.
18 """
20 class AnyTable(MetaAPIResource):
21 """
22 This is the table abstraction for any resource of the Salesforce API.
23 """
25 def __init__(self, *args, table_name=None, **kwargs):
26 """
27 Initializes the AnyTable class.
29 Args:
30 *args: Variable length argument list.
31 table_name (str): The name of the table that represents the Salesforce resource.
32 **kwargs: Arbitrary keyword arguments.
33 """
34 super().__init__(*args, table_name=table_name, **kwargs)
35 self.resource_metadata = None
37 def select(self, query: Select) -> pd.DataFrame:
38 """
39 Executes a SELECT SQL query represented by an ASTNode object on the Salesforce resource and retrieves the data (if any).
41 Args:
42 query (ASTNode): An ASTNode object representing the SQL query to be executed.
44 Returns:
45 pd.DataFrame: A DataFrame containing the data retrieved from the Salesforce resource.
46 """
47 query.from_table = resource_name
49 # SOQL does not support * in SELECT queries. Replace * with column names.
50 if isinstance(query.targets[0], Star):
51 query.targets = [Identifier(column) for column in self.get_columns()]
53 # SOQL does not support column aliases. Remove column aliases.
54 column_aliases = {}
55 for column in query.targets:
56 if column.alias is not None:
57 column_aliases[column.parts[-1]] = column.alias.parts[-1]
58 column.alias = None
60 client = self.handler.connect()
62 query_str = query.to_string()
64 # SOQL does not support backticks. Remove backticks.
65 query_str = query_str.replace("`", "")
66 results = client.sobjects.query(query_str)
68 for result in results:
69 del result["attributes"]
71 df = pd.DataFrame(results)
72 df.rename(columns=column_aliases, inplace=True)
74 return df
76 def add(self, item: Dict) -> None:
77 """
78 Adds a new item to the Salesforce resource.
80 Args:
81 contact (Dict): The data to be inserted into the Salesforce resource.
82 """
83 client = self.handler.connect()
84 getattr(client.sobjects, resource_name).insert(item)
86 def modify(self, conditions: List[FilterCondition], values: Dict) -> None:
87 """
88 Modifies items in the Salesforce resource based on the specified conditions.
90 Args:
91 conditions (List[FilterCondition]): The conditions based on which the items are to be modified.
92 values (Dict): The values to be updated in the items.
93 """
94 client = self.handler.connect()
96 ids = self._validate_conditions(conditions)
98 for id in ids:
99 getattr(client.sobjects, resource_name).update(id, values)
101 def remove(self, conditions: List[FilterCondition]) -> None:
102 """
103 Removes items from the Salesforce resource based on the specified conditions.
105 Args:
106 conditions (List[FilterCondition]): The conditions based on which the items are to be removed.
107 """
108 client = self.handler.connect()
110 ids = self._validate_conditions(conditions)
112 for id in ids:
113 getattr(client.sobjects, resource_name).delete(id)
115 def _validate_conditions(self, conditions: List[FilterCondition]) -> None:
116 """
117 Validates the conditions used for filtering items in the Salesforce resource.
119 Args:
120 conditions (List[FilterCondition]): The conditions to be validated.
121 """
122 # Salesforce API does not support filtering items based on attributes other than 'Id'. Raise an error if any other column is used.
123 if len(conditions) != 1 or conditions[0].column != "Id":
124 raise ValueError("Only the 'Id' column can be used to filter items.")
126 # Only the 'equals' and 'in' operators can be used on the 'Id' column for deletion. Raise an error if any other operator is used.
127 if conditions[0].op not in [FilterOperator.EQUAL, FilterOperator.IN]:
128 raise ValueError("Only the 'equals' and 'in' operators can be used on the 'Id' column.")
130 return conditions[0].value if isinstance(conditions[0].value, list) else [conditions[0].value]
132 def _get_resource_metadata(self) -> Dict:
133 """
134 Retrieves metadata about the Salesforce resource.
136 Returns:
137 Dict: A dictionary containing metadata about the Salesforce resource.
138 """
139 if self.resource_metadata: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 return self.resource_metadata
142 client = self.handler.connect()
143 return getattr(client.sobjects, resource_name).describe()
145 def get_columns(self) -> List[Text]:
146 """
147 Retrieves the attributes (columns) of the Salesforce resource.
149 Returns:
150 List[Text]: A list of Attributes (columns) of the Salesforce resource.
151 """
152 return [field["name"] for field in self._get_resource_metadata()["fields"]]
154 def meta_get_tables(self, table_name: str, main_metadata: Dict) -> Dict:
155 """
156 Retrieves table metadata for the Salesforce resource.
158 Args:
159 table_name (str): The name given to the table that represents the Salesforce resource.
160 main_metadata (Dict): The main metadata dictionary containing information about all Salesforce resources.
162 Returns:
163 Dict: A dictionary containing table metadata for the Salesforce resource.
164 """
165 client = self.handler.connect()
167 try:
168 resource_metadata = next(
169 (resource for resource in main_metadata if resource["name"].lower() == resource_name),
170 )
171 except Exception as e:
172 logger.warning(f"Failed to get resource metadata for {resource_name}: {e}")
173 return {
174 "table_name": table_name,
175 "table_type": "BASE TABLE",
176 "table_description": "",
177 "row_count": None,
178 }
179 # Get row count if Id column is aggregatable.
180 row_count = None
181 # if next(field for field in resource_metadata['fields'] if field['name'] == 'Id').get('aggregatable', False):
182 try:
183 row_count = client.sobjects.query(f"SELECT COUNT(Id) FROM {resource_name}")[0]["expr0"]
184 except RestRequestCouldNotBeUnderstoodError as request_error:
185 logger.warning(f"Failed to get row count for {resource_name}: {request_error}")
187 return {
188 "table_name": table_name,
189 "table_type": "BASE TABLE",
190 "table_description": resource_metadata.get("label", ""),
191 "row_count": row_count,
192 }
194 def meta_get_columns(self, table_name: str) -> List[Dict]:
195 """
196 Retrieves column metadata for the Salesforce resource.
198 Args:
199 table_name (str): The name given to the table that represents the Salesforce resource.
201 Returns:
202 List[Dict]: A list of dictionaries containing column metadata for the Salesforce resource.
203 """
204 resource_metadata = self._get_resource_metadata()
206 column_metadata = []
207 for field in resource_metadata["fields"]:
208 column_metadata.append(
209 {
210 "table_name": table_name,
211 "column_name": field["name"],
212 "data_type": field["type"],
213 "is_nullable": field.get("nillable", False),
214 "default_value": field.get("defaultValue", ""),
215 "description": field.get("inlineHelpText", ""),
216 }
217 )
219 return column_metadata
221 def meta_get_primary_keys(self, table_name: str) -> List[Dict]:
222 """
223 Retrieves the primary keys for the Salesforce resource.
225 Args:
226 table_name (str): The name given to the table that represents the Salesforce resource.
228 Returns:
229 List[Dict]: A list of dictionaries containing primary key metadata for the Salesforce resource.
230 """
231 return [
232 {
233 "table_name": table_name,
234 "column_name": "Id",
235 }
236 ]
238 def meta_get_foreign_keys(self, table_name: str, all_tables: List[str]) -> List[Dict]:
239 """
240 Retrieves the foreign keys for the Salesforce resource.
242 Args:
243 table_name (str): The name given to the table that represents the Salesforce resource.
244 all_tables (List[str]): A list of all table names in the Salesforce database.
246 Returns:
247 List[Dict]: A list of dictionaries containing foreign key metadata for the Salesforce resource.
248 """
249 resource_metadata = self._get_resource_metadata()
251 foreign_key_metadata = []
252 for child_relationship in resource_metadata.get("childRelationships", []):
253 # Skip if the child relationship is not one of the supported tables.
254 child_table_name = child_relationship["childSObject"]
255 if child_table_name not in all_tables:
256 continue
258 foreign_key_metadata.append(
259 {
260 "parent_table_name": table_name,
261 "parent_column_name": "Id",
262 "child_table_name": child_table_name,
263 "child_column_name": child_relationship["field"],
264 }
265 )
267 return foreign_key_metadata
269 return AnyTable