Coverage for mindsdb / integrations / handlers / faunadb_handler / faunadb_handler.py: 0%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import pandas as pd 

3from typing import List 

4 

5from faunadb import query as q 

6from faunadb.client import FaunaClient 

7from mindsdb_sql_parser import Select, Insert, CreateTable, Delete 

8from mindsdb_sql_parser.ast.select.star import Star 

9from mindsdb_sql_parser.ast.base import ASTNode 

10 

11from mindsdb.integrations.libs.response import ( 

12 RESPONSE_TYPE, 

13 HandlerResponse as Response, 

14 HandlerStatusResponse as StatusResponse, 

15) 

16from mindsdb.integrations.libs.base import DatabaseHandler 

17 

18from mindsdb.utilities import log 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class FaunaDBHandler(DatabaseHandler): 

24 """This handler handles connection and execution of the FaunaDB statements.""" 

25 

26 name = "faunadb" 

27 

28 def __init__(self, name: str, **kwargs): 

29 super().__init__(name) 

30 

31 self._connection_data = kwargs.get("connection_data") 

32 

33 self._client_config = { 

34 "fauna_secret": self._connection_data.get("fauna_secret"), 

35 "fauna_scheme": self._connection_data.get("fauna_scheme"), 

36 "fauna_domain": self._connection_data.get("fauna_domain"), 

37 "fauna_port": self._connection_data.get("fauna_port"), 

38 "fauna_endpoint": self._connection_data.get("fauna_endpoint"), 

39 } 

40 

41 scheme, domain, port, endpoint = ( 

42 self._client_config["fauna_scheme"], 

43 self._client_config["fauna_domain"], 

44 self._client_config["fauna_port"], 

45 self._client_config["fauna_endpoint"], 

46 ) 

47 

48 # should have the secret 

49 if (self._client_config["fauna_secret"]) is None: 

50 raise Exception("FaunaDB secret is required for FaunaDB connection!") 

51 # either scheme + domain + port or endpoint is required 

52 # but not both 

53 if not endpoint and not (scheme and domain and port): 

54 raise Exception( 

55 "Either scheme + domain + port or endpoint is required for FaunaDB connection!" 

56 ) 

57 elif endpoint and (scheme or domain or port): 

58 raise Exception( 

59 "Either scheme + domain + port or endpoint is required for FaunaDB connection, but not both!" 

60 ) 

61 

62 self._client = None 

63 self.is_connected = False 

64 self.connect() 

65 

66 def _get_client(self): 

67 client_config = self._client_config 

68 if client_config is None: 

69 raise Exception("Client config is not set!") 

70 

71 if client_config["fauna_endpoint"] is not None: 

72 return FaunaClient( 

73 secret=client_config["fauna_secret"], 

74 endpoint=client_config["fauna_endpoint"], 

75 ) 

76 else: 

77 return FaunaClient( 

78 secret=client_config["fauna_secret"], 

79 scheme=client_config["fauna_scheme"], 

80 domain=client_config["fauna_domain"], 

81 port=client_config["fauna_port"], 

82 ) 

83 

84 def __del__(self): 

85 if self.is_connected is True: 

86 self.disconnect() 

87 

88 def connect(self): 

89 """Connect to a FaunaDB database.""" 

90 if self.is_connected is True: 

91 return self._client 

92 

93 try: 

94 self._client = self._get_client() 

95 self.is_connected = True 

96 return self._client 

97 except Exception as e: 

98 logger.error(f"Error connecting to FaunaDB client, {e}!") 

99 self.is_connected = False 

100 

101 def disconnect(self): 

102 """Close the database connection.""" 

103 

104 if self.is_connected is False: 

105 return 

106 

107 self._client = None 

108 self.is_connected = False 

109 

110 def check_connection(self): 

111 """Check the connection to the FaunaDB database.""" 

112 response_code = StatusResponse(False) 

113 need_to_close = self.is_connected is False 

114 

115 try: 

116 self._client.ping() 

117 response_code.success = True 

118 except Exception as e: 

119 logger.error(f"Error connecting to FaunaDB , {e}!") 

120 response_code.error_message = str(e) 

121 finally: 

122 if response_code.success is True and need_to_close: 

123 self.disconnect() 

124 if response_code.success is False and self.is_connected is True: 

125 self.is_connected = False 

126 

127 return response_code 

128 

129 def query(self, query: ASTNode) -> Response: 

130 """Render and execute a SQL query. 

131 

132 Args: 

133 query (ASTNode): The SQL query. 

134 

135 Returns: 

136 Response: The query result. 

137 """ 

138 

139 if isinstance(query, Select): 

140 collection = str(query.from_table) 

141 fields = query.targets 

142 conditions = query.where 

143 offset = query.offset 

144 limit = query.limit 

145 # TODO: research how to parse individual columns from document 

146 # fields = [f.to_string().split()[2] for f in fields] 

147 result = self.select(collection, fields, conditions, offset, limit) 

148 

149 elif isinstance(query, Insert): 

150 collection = str(query.table) 

151 fields = [col.name for col in query.columns] 

152 values = query.values 

153 self.insert(collection, fields, values) 

154 return Response(resp_type=RESPONSE_TYPE.OK) 

155 

156 elif isinstance(query, CreateTable): 

157 collection_name = str(query.name) 

158 self.create_table(collection_name) 

159 return Response(resp_type=RESPONSE_TYPE.OK) 

160 

161 elif isinstance(query, Delete): 

162 collection_name = str(query.table) 

163 conditions = query.where 

164 self.delete_document(collection_name, conditions) 

165 return Response(resp_type=RESPONSE_TYPE.OK) 

166 """ 

167 # NOT Working for integration tables yet 

168 elif isinstance(query, DropTables): 

169 collection_name = str(query.tables) 

170 self.drop_table(collection_name) 

171 """ 

172 

173 df = pd.json_normalize(result['data']) 

174 return Response(RESPONSE_TYPE.TABLE, df) 

175 

176 def select( 

177 self, 

178 table_name: str, 

179 columns: List[str] = None, 

180 conditions: List[str] = None, 

181 offset: int = None, 

182 limit: int = None, 

183 ) -> dict: 

184 # select * from db_name.collection_name 

185 if len(columns) > 0 and isinstance(columns[0], Star): 

186 fauna_query = q.map_( 

187 q.lambda_("ref", q.get(q.var("ref"))), 

188 q.paginate(q.documents(q.collection(str(table_name)))), 

189 ) 

190 else: 

191 # select id, name ,etc from db_name.collection_name 

192 fauna_query = q.map_( 

193 q.lambda_("doc", {"data": q.select(columns, q.var("doc"))}), 

194 q.paginate(q.documents(q.collection(table_name))), 

195 ) 

196 

197 return self._client.query(fauna_query) 

198 

199 def insert(self, table_name: str, fields, values) -> Response: 

200 if len(fields) == 1 and fields[0] == "data": 

201 for value in values: 

202 value = json.loads(value[0]) 

203 if isinstance(value, dict): 

204 value = [value] 

205 for data in value: 

206 self._client.query( 

207 q.create( 

208 q.collection(table_name), 

209 {"data": data}, 

210 ) 

211 ) 

212 else: 

213 for value in values: 

214 data = {f: v for f, v in zip(fields, value)} 

215 self._client.query( 

216 q.create( 

217 q.collection(table_name), 

218 {"data": data}, 

219 ) 

220 ) 

221 

222 def create_table(self, table_name: str, if_not_exists=True) -> Response: 

223 """ 

224 Create a collection with the given name in the FaunaDB database. 

225 """ 

226 fauna_query = q.create_collection({"name": table_name}) 

227 self._client.query(fauna_query) 

228 

229 def delete_document(self, table_name: str, conditions: List[str]) -> Response: 

230 """ 

231 Delete a document with the given id in the FaunaDB database. 

232 """ 

233 # get the id of the document (only = operator supported right now, can add more) 

234 ref = conditions.args[1].value 

235 fauna_query = q.delete(q.ref(q.collection(table_name), ref)) 

236 self._client.query(fauna_query) 

237 

238 def drop_table(self, table_name: str, if_exists=True) -> Response: 

239 """ 

240 Delete a collection from the FaunaDB database. 

241 """ 

242 fauna_query = q.delete(q.collection(table_name)) 

243 self._client.query(fauna_query) 

244 return Response(resp_type=RESPONSE_TYPE.OK) 

245 

246 def get_tables(self) -> Response: 

247 """ 

248 Get the list of collections in the FaunaDB database. 

249 """ 

250 try: 

251 result = self._client.query(q.paginate(q.collections())) 

252 collections = [] 

253 for collection in result["data"]: 

254 collections.append(collection.id()) 

255 return Response( 

256 resp_type=RESPONSE_TYPE.TABLE, 

257 data_frame=pd.DataFrame( 

258 collections, 

259 columns=["table_name"], 

260 ), 

261 ) 

262 except Exception as e: 

263 logger.error(f"Error getting tables from FaunaDB: {e}") 

264 return Response( 

265 resp_type=RESPONSE_TYPE.ERROR, 

266 error_message=f"Error getting tables from FaunaDB: {e}", 

267 )