Coverage for mindsdb / integrations / handlers / dynamodb_handler / dynamodb_handler.py: 83%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Text, List, Dict, Optional
3import boto3
4from boto3.dynamodb.types import TypeDeserializer
5from botocore.exceptions import ClientError
6from mindsdb_sql_parser.ast import Select, Insert, Join
7from mindsdb_sql_parser.ast.base import ASTNode
8import pandas as pd
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE,
15)
16from mindsdb.utilities import log
19logger = log.getLogger(__name__)
22class DynamoDBHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the SQL statements on Amazon DynamoDB.
25 """
27 name = "dynamodb"
29 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs):
30 """
31 Initializes the handler.
33 Args:
34 name (Text): The name of the handler instance.
35 connection_data (Dict): The connection data required to connect to Amazon DynamoDB.
36 kwargs: Arbitrary keyword arguments.
37 """
38 super().__init__(name)
39 self.connection_data = connection_data
40 self.kwargs = kwargs
42 self.connection = None
43 self.is_connected = False
44 self.cache_thread_safe = True
46 def __del__(self) -> None:
47 """
48 Closes the connection when the handler instance is deleted.
49 """
50 if self.is_connected:
51 self.disconnect()
53 def connect(self) -> boto3.client:
54 """
55 Establishes a connection to Amazon DynamoDB.
57 Raises:
58 ValueError: If the expected connection parameters are not provided.
60 Returns:
61 boto3.client: A client object to Amazon DynamoDB.
62 """
63 if self.is_connected is True: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 return self.connection
66 # Mandatory connection parameters.
67 if not all(
68 key in self.connection_data for key in ["aws_access_key_id", "aws_secret_access_key", "region_name"]
69 ):
70 logger.error(
71 "Connection failed as required parameters (aws_access_key_id, aws_secret_access_key, region_name) have not been provided."
72 )
73 raise ValueError(
74 "Required parameters (aws_access_key_id, aws_secret_access_key, region_name) must be provided."
75 )
77 config = {
78 "aws_access_key_id": self.connection_data.get("aws_access_key_id"),
79 "aws_secret_access_key": self.connection_data.get("aws_secret_access_key"),
80 "region_name": self.connection_data.get("region_name"),
81 }
83 # Optional connection parameters.
84 optional_parameters = ["aws_session_token"]
85 for param in optional_parameters:
86 if param in self.connection_data: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 config[param] = self.connection_data[param]
89 # An exception is not raised even if the credentials are invalid, therefore, no error handling is required.
90 self.connection = boto3.client("dynamodb", **config)
92 self.is_connected = True
94 return self.connection
96 def disconnect(self) -> None:
97 """
98 Closes the connection to the Amazon DynamoDB if it's currently open.
99 """
100 if self.is_connected is False:
101 return
103 self.connection.close()
104 self.is_connected = False
106 def check_connection(self) -> StatusResponse:
107 """
108 Checks the status of the connection to Amazon DynamoDB.
110 Returns:
111 StatusResponse: An object containing the success status and an error message if an error occurs.
112 """
113 response = StatusResponse(False)
114 need_to_close = self.is_connected is False
116 try:
117 connection = self.connect()
118 connection.list_tables()
120 response.success = True
121 except (ValueError, ClientError) as known_error:
122 logger.error(f"Connection check to Amazon DynamoDB failed, {known_error}!")
123 response.error_message = str(known_error)
124 except Exception as unknown_error:
125 logger.error(f"Connection check to Amazon DynamoDB failed due to an unknown error, {unknown_error}!")
126 response.error_message = str(unknown_error)
128 if response.success and need_to_close:
129 self.disconnect()
131 elif not response.success and self.is_connected: 131 ↛ 134line 131 didn't jump to line 134 because the condition on line 131 was always true
132 self.is_connected = False
134 return response
136 def native_query(self, query: Text) -> Response:
137 """
138 Executes a native SQL query (PartiQL) on Amazon DynamoDB and returns the result.
140 Args:
141 query (Text): The SQL query to be executed.
143 Returns:
144 Response: A response object containing the result of the query or an error message.
145 """
146 need_to_close = self.is_connected is False
148 connection = self.connect()
150 try:
151 result = connection.execute_statement(Statement=query)
153 if result["Items"]: 153 ↛ 164line 153 didn't jump to line 164 because the condition on line 153 was always true
154 # TODO: Can parsing be optimized?
155 records = []
156 records.extend(self._parse_records(result["Items"]))
158 while "LastEvaluatedKey" in result: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 result = connection.execute_statement(Statement=query, NextToken=result["NextToken"])
160 records.extend(self._parse_records(result["Items"]))
162 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.json_normalize(records))
163 else:
164 response = Response(RESPONSE_TYPE.OK)
165 except ClientError as client_error:
166 logger.error(f"Error running query: {query} on DynamoDB!")
167 response = Response(RESPONSE_TYPE.ERROR, error_message=str(client_error))
168 except Exception as unknown_error:
169 logger.error(f"Unknown error running query: {query} on DynamoDB!")
170 response = Response(RESPONSE_TYPE.ERROR, error_message=str(unknown_error))
172 connection.close()
173 if need_to_close is True: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true
174 self.disconnect()
176 return response
178 def _parse_records(self, records: List[Dict]) -> Dict:
179 """
180 Parses the records returned by the PartiQL query execution.
182 Args:
183 records (List[Dict]): A list of records returned by the PartiQL query execution.
185 Returns:
186 Dict: A dictionary containing the parsed record.
187 """
188 deserializer = TypeDeserializer()
190 parsed_records = []
191 for record in records:
192 parsed_records.append({k: deserializer.deserialize(v) for k, v in record.items()})
194 return parsed_records
196 def query(self, query: ASTNode) -> Response:
197 """
198 Executes a SQL query represented by an ASTNode on Amazon DynamoDB and retrieves the data.
200 Args:
201 query (ASTNode): An ASTNode representing the SQL query to be executed.
203 Returns:
204 Response: The response from the `native_query` method, containing the result of the SQL query execution.
205 """
206 if isinstance(query, Select):
207 error_message = None
208 if query.limit or query.group_by or query.having or query.offset:
209 error_message = "The provided SELECT query contains unsupported clauses. "
211 if isinstance(query.from_table, Select): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 error_message = "The provided SELECT query contains subqueies, which are not supported. "
214 if isinstance(query.from_table, Join): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 error_message = "The provided SELECT query contains JOIN clauses, which are not supported. "
217 if error_message:
218 error_message += "Please refer to the following documentation for running PartiQL SELECT queries against Amazon DynamoDB: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.select.html"
219 raise ValueError(error_message)
221 # TODO: Add support for INSERT queries.
222 elif isinstance(query, Insert): 222 ↛ 225line 222 didn't jump to line 225 because the condition on line 222 was always true
223 raise ValueError("Insert queries are not supported by this integration at the moment.")
225 return self.native_query(query.to_string())
227 def get_tables(self) -> Response:
228 """
229 Retrieves a list of all tables in Amazon DynamoDB.
231 Returns:
232 Response: A response object containing a list of tables in Amazon DynamoDB.
233 """
234 result = self.connection.list_tables()
236 df = pd.DataFrame(data=result["TableNames"], columns=["table_name"])
238 response = Response(RESPONSE_TYPE.TABLE, df)
240 return response
242 def get_columns(self, table_name: Text) -> Response:
243 """
244 Retrieves column (attribute) details for a specified table in Amazon DynamoDB.
246 Args:
247 table_name (Text): The name of the table for which to retrieve column information.
249 Raises:
250 ValueError: If the 'table_name' is not a valid string.
252 Returns:
253 Response: A response object containing the column details.
254 """
255 if not table_name or not isinstance(table_name, str): 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 raise ValueError("Invalid table name provided.")
258 result = self.connection.describe_table(TableName=table_name)
260 df = pd.DataFrame(result["Table"]["AttributeDefinitions"])
262 df = df.rename(columns={"AttributeName": "column_name", "AttributeType": "data_type"})
264 response = Response(RESPONSE_TYPE.TABLE, df)
266 return response