Coverage for mindsdb / integrations / handlers / dynamodb_handler / dynamodb_handler.py: 83%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Text, List, Dict, Optional 

2 

3import boto3 

4from boto3.dynamodb.types import TypeDeserializer 

5from botocore.exceptions import ClientError 

6from mindsdb_sql_parser.ast import Select, Insert, Join 

7from mindsdb_sql_parser.ast.base import ASTNode 

8import pandas as pd 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE, 

15) 

16from mindsdb.utilities import log 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class DynamoDBHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the SQL statements on Amazon DynamoDB. 

25 """ 

26 

27 name = "dynamodb" 

28 

29 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs): 

30 """ 

31 Initializes the handler. 

32 

33 Args: 

34 name (Text): The name of the handler instance. 

35 connection_data (Dict): The connection data required to connect to Amazon DynamoDB. 

36 kwargs: Arbitrary keyword arguments. 

37 """ 

38 super().__init__(name) 

39 self.connection_data = connection_data 

40 self.kwargs = kwargs 

41 

42 self.connection = None 

43 self.is_connected = False 

44 self.cache_thread_safe = True 

45 

46 def __del__(self) -> None: 

47 """ 

48 Closes the connection when the handler instance is deleted. 

49 """ 

50 if self.is_connected: 

51 self.disconnect() 

52 

53 def connect(self) -> boto3.client: 

54 """ 

55 Establishes a connection to Amazon DynamoDB. 

56 

57 Raises: 

58 ValueError: If the expected connection parameters are not provided. 

59 

60 Returns: 

61 boto3.client: A client object to Amazon DynamoDB. 

62 """ 

63 if self.is_connected is True: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 return self.connection 

65 

66 # Mandatory connection parameters. 

67 if not all( 

68 key in self.connection_data for key in ["aws_access_key_id", "aws_secret_access_key", "region_name"] 

69 ): 

70 logger.error( 

71 "Connection failed as required parameters (aws_access_key_id, aws_secret_access_key, region_name) have not been provided." 

72 ) 

73 raise ValueError( 

74 "Required parameters (aws_access_key_id, aws_secret_access_key, region_name) must be provided." 

75 ) 

76 

77 config = { 

78 "aws_access_key_id": self.connection_data.get("aws_access_key_id"), 

79 "aws_secret_access_key": self.connection_data.get("aws_secret_access_key"), 

80 "region_name": self.connection_data.get("region_name"), 

81 } 

82 

83 # Optional connection parameters. 

84 optional_parameters = ["aws_session_token"] 

85 for param in optional_parameters: 

86 if param in self.connection_data: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 config[param] = self.connection_data[param] 

88 

89 # An exception is not raised even if the credentials are invalid, therefore, no error handling is required. 

90 self.connection = boto3.client("dynamodb", **config) 

91 

92 self.is_connected = True 

93 

94 return self.connection 

95 

96 def disconnect(self) -> None: 

97 """ 

98 Closes the connection to the Amazon DynamoDB if it's currently open. 

99 """ 

100 if self.is_connected is False: 

101 return 

102 

103 self.connection.close() 

104 self.is_connected = False 

105 

106 def check_connection(self) -> StatusResponse: 

107 """ 

108 Checks the status of the connection to Amazon DynamoDB. 

109 

110 Returns: 

111 StatusResponse: An object containing the success status and an error message if an error occurs. 

112 """ 

113 response = StatusResponse(False) 

114 need_to_close = self.is_connected is False 

115 

116 try: 

117 connection = self.connect() 

118 connection.list_tables() 

119 

120 response.success = True 

121 except (ValueError, ClientError) as known_error: 

122 logger.error(f"Connection check to Amazon DynamoDB failed, {known_error}!") 

123 response.error_message = str(known_error) 

124 except Exception as unknown_error: 

125 logger.error(f"Connection check to Amazon DynamoDB failed due to an unknown error, {unknown_error}!") 

126 response.error_message = str(unknown_error) 

127 

128 if response.success and need_to_close: 

129 self.disconnect() 

130 

131 elif not response.success and self.is_connected: 131 ↛ 134line 131 didn't jump to line 134 because the condition on line 131 was always true

132 self.is_connected = False 

133 

134 return response 

135 

136 def native_query(self, query: Text) -> Response: 

137 """ 

138 Executes a native SQL query (PartiQL) on Amazon DynamoDB and returns the result. 

139 

140 Args: 

141 query (Text): The SQL query to be executed. 

142 

143 Returns: 

144 Response: A response object containing the result of the query or an error message. 

145 """ 

146 need_to_close = self.is_connected is False 

147 

148 connection = self.connect() 

149 

150 try: 

151 result = connection.execute_statement(Statement=query) 

152 

153 if result["Items"]: 153 ↛ 164line 153 didn't jump to line 164 because the condition on line 153 was always true

154 # TODO: Can parsing be optimized? 

155 records = [] 

156 records.extend(self._parse_records(result["Items"])) 

157 

158 while "LastEvaluatedKey" in result: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 result = connection.execute_statement(Statement=query, NextToken=result["NextToken"]) 

160 records.extend(self._parse_records(result["Items"])) 

161 

162 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.json_normalize(records)) 

163 else: 

164 response = Response(RESPONSE_TYPE.OK) 

165 except ClientError as client_error: 

166 logger.error(f"Error running query: {query} on DynamoDB!") 

167 response = Response(RESPONSE_TYPE.ERROR, error_message=str(client_error)) 

168 except Exception as unknown_error: 

169 logger.error(f"Unknown error running query: {query} on DynamoDB!") 

170 response = Response(RESPONSE_TYPE.ERROR, error_message=str(unknown_error)) 

171 

172 connection.close() 

173 if need_to_close is True: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true

174 self.disconnect() 

175 

176 return response 

177 

178 def _parse_records(self, records: List[Dict]) -> Dict: 

179 """ 

180 Parses the records returned by the PartiQL query execution. 

181 

182 Args: 

183 records (List[Dict]): A list of records returned by the PartiQL query execution. 

184 

185 Returns: 

186 Dict: A dictionary containing the parsed record. 

187 """ 

188 deserializer = TypeDeserializer() 

189 

190 parsed_records = [] 

191 for record in records: 

192 parsed_records.append({k: deserializer.deserialize(v) for k, v in record.items()}) 

193 

194 return parsed_records 

195 

196 def query(self, query: ASTNode) -> Response: 

197 """ 

198 Executes a SQL query represented by an ASTNode on Amazon DynamoDB and retrieves the data. 

199 

200 Args: 

201 query (ASTNode): An ASTNode representing the SQL query to be executed. 

202 

203 Returns: 

204 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

205 """ 

206 if isinstance(query, Select): 

207 error_message = None 

208 if query.limit or query.group_by or query.having or query.offset: 

209 error_message = "The provided SELECT query contains unsupported clauses. " 

210 

211 if isinstance(query.from_table, Select): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 error_message = "The provided SELECT query contains subqueies, which are not supported. " 

213 

214 if isinstance(query.from_table, Join): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 error_message = "The provided SELECT query contains JOIN clauses, which are not supported. " 

216 

217 if error_message: 

218 error_message += "Please refer to the following documentation for running PartiQL SELECT queries against Amazon DynamoDB: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.select.html" 

219 raise ValueError(error_message) 

220 

221 # TODO: Add support for INSERT queries. 

222 elif isinstance(query, Insert): 222 ↛ 225line 222 didn't jump to line 225 because the condition on line 222 was always true

223 raise ValueError("Insert queries are not supported by this integration at the moment.") 

224 

225 return self.native_query(query.to_string()) 

226 

227 def get_tables(self) -> Response: 

228 """ 

229 Retrieves a list of all tables in Amazon DynamoDB. 

230 

231 Returns: 

232 Response: A response object containing a list of tables in Amazon DynamoDB. 

233 """ 

234 result = self.connection.list_tables() 

235 

236 df = pd.DataFrame(data=result["TableNames"], columns=["table_name"]) 

237 

238 response = Response(RESPONSE_TYPE.TABLE, df) 

239 

240 return response 

241 

242 def get_columns(self, table_name: Text) -> Response: 

243 """ 

244 Retrieves column (attribute) details for a specified table in Amazon DynamoDB. 

245 

246 Args: 

247 table_name (Text): The name of the table for which to retrieve column information. 

248 

249 Raises: 

250 ValueError: If the 'table_name' is not a valid string. 

251 

252 Returns: 

253 Response: A response object containing the column details. 

254 """ 

255 if not table_name or not isinstance(table_name, str): 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 raise ValueError("Invalid table name provided.") 

257 

258 result = self.connection.describe_table(TableName=table_name) 

259 

260 df = pd.DataFrame(result["Table"]["AttributeDefinitions"]) 

261 

262 df = df.rename(columns={"AttributeName": "column_name", "AttributeType": "data_type"}) 

263 

264 response = Response(RESPONSE_TYPE.TABLE, df) 

265 

266 return response