Coverage for mindsdb / integrations / handlers / faunadb_handler / faunadb_handler.py: 0%
130 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import pandas as pd
3from typing import List
5from faunadb import query as q
6from faunadb.client import FaunaClient
7from mindsdb_sql_parser import Select, Insert, CreateTable, Delete
8from mindsdb_sql_parser.ast.select.star import Star
9from mindsdb_sql_parser.ast.base import ASTNode
11from mindsdb.integrations.libs.response import (
12 RESPONSE_TYPE,
13 HandlerResponse as Response,
14 HandlerStatusResponse as StatusResponse,
15)
16from mindsdb.integrations.libs.base import DatabaseHandler
18from mindsdb.utilities import log
20logger = log.getLogger(__name__)
23class FaunaDBHandler(DatabaseHandler):
24 """This handler handles connection and execution of the FaunaDB statements."""
26 name = "faunadb"
28 def __init__(self, name: str, **kwargs):
29 super().__init__(name)
31 self._connection_data = kwargs.get("connection_data")
33 self._client_config = {
34 "fauna_secret": self._connection_data.get("fauna_secret"),
35 "fauna_scheme": self._connection_data.get("fauna_scheme"),
36 "fauna_domain": self._connection_data.get("fauna_domain"),
37 "fauna_port": self._connection_data.get("fauna_port"),
38 "fauna_endpoint": self._connection_data.get("fauna_endpoint"),
39 }
41 scheme, domain, port, endpoint = (
42 self._client_config["fauna_scheme"],
43 self._client_config["fauna_domain"],
44 self._client_config["fauna_port"],
45 self._client_config["fauna_endpoint"],
46 )
48 # should have the secret
49 if (self._client_config["fauna_secret"]) is None:
50 raise Exception("FaunaDB secret is required for FaunaDB connection!")
51 # either scheme + domain + port or endpoint is required
52 # but not both
53 if not endpoint and not (scheme and domain and port):
54 raise Exception(
55 "Either scheme + domain + port or endpoint is required for FaunaDB connection!"
56 )
57 elif endpoint and (scheme or domain or port):
58 raise Exception(
59 "Either scheme + domain + port or endpoint is required for FaunaDB connection, but not both!"
60 )
62 self._client = None
63 self.is_connected = False
64 self.connect()
66 def _get_client(self):
67 client_config = self._client_config
68 if client_config is None:
69 raise Exception("Client config is not set!")
71 if client_config["fauna_endpoint"] is not None:
72 return FaunaClient(
73 secret=client_config["fauna_secret"],
74 endpoint=client_config["fauna_endpoint"],
75 )
76 else:
77 return FaunaClient(
78 secret=client_config["fauna_secret"],
79 scheme=client_config["fauna_scheme"],
80 domain=client_config["fauna_domain"],
81 port=client_config["fauna_port"],
82 )
84 def __del__(self):
85 if self.is_connected is True:
86 self.disconnect()
88 def connect(self):
89 """Connect to a FaunaDB database."""
90 if self.is_connected is True:
91 return self._client
93 try:
94 self._client = self._get_client()
95 self.is_connected = True
96 return self._client
97 except Exception as e:
98 logger.error(f"Error connecting to FaunaDB client, {e}!")
99 self.is_connected = False
101 def disconnect(self):
102 """Close the database connection."""
104 if self.is_connected is False:
105 return
107 self._client = None
108 self.is_connected = False
110 def check_connection(self):
111 """Check the connection to the FaunaDB database."""
112 response_code = StatusResponse(False)
113 need_to_close = self.is_connected is False
115 try:
116 self._client.ping()
117 response_code.success = True
118 except Exception as e:
119 logger.error(f"Error connecting to FaunaDB , {e}!")
120 response_code.error_message = str(e)
121 finally:
122 if response_code.success is True and need_to_close:
123 self.disconnect()
124 if response_code.success is False and self.is_connected is True:
125 self.is_connected = False
127 return response_code
129 def query(self, query: ASTNode) -> Response:
130 """Render and execute a SQL query.
132 Args:
133 query (ASTNode): The SQL query.
135 Returns:
136 Response: The query result.
137 """
139 if isinstance(query, Select):
140 collection = str(query.from_table)
141 fields = query.targets
142 conditions = query.where
143 offset = query.offset
144 limit = query.limit
145 # TODO: research how to parse individual columns from document
146 # fields = [f.to_string().split()[2] for f in fields]
147 result = self.select(collection, fields, conditions, offset, limit)
149 elif isinstance(query, Insert):
150 collection = str(query.table)
151 fields = [col.name for col in query.columns]
152 values = query.values
153 self.insert(collection, fields, values)
154 return Response(resp_type=RESPONSE_TYPE.OK)
156 elif isinstance(query, CreateTable):
157 collection_name = str(query.name)
158 self.create_table(collection_name)
159 return Response(resp_type=RESPONSE_TYPE.OK)
161 elif isinstance(query, Delete):
162 collection_name = str(query.table)
163 conditions = query.where
164 self.delete_document(collection_name, conditions)
165 return Response(resp_type=RESPONSE_TYPE.OK)
166 """
167 # NOT Working for integration tables yet
168 elif isinstance(query, DropTables):
169 collection_name = str(query.tables)
170 self.drop_table(collection_name)
171 """
173 df = pd.json_normalize(result['data'])
174 return Response(RESPONSE_TYPE.TABLE, df)
176 def select(
177 self,
178 table_name: str,
179 columns: List[str] = None,
180 conditions: List[str] = None,
181 offset: int = None,
182 limit: int = None,
183 ) -> dict:
184 # select * from db_name.collection_name
185 if len(columns) > 0 and isinstance(columns[0], Star):
186 fauna_query = q.map_(
187 q.lambda_("ref", q.get(q.var("ref"))),
188 q.paginate(q.documents(q.collection(str(table_name)))),
189 )
190 else:
191 # select id, name ,etc from db_name.collection_name
192 fauna_query = q.map_(
193 q.lambda_("doc", {"data": q.select(columns, q.var("doc"))}),
194 q.paginate(q.documents(q.collection(table_name))),
195 )
197 return self._client.query(fauna_query)
199 def insert(self, table_name: str, fields, values) -> Response:
200 if len(fields) == 1 and fields[0] == "data":
201 for value in values:
202 value = json.loads(value[0])
203 if isinstance(value, dict):
204 value = [value]
205 for data in value:
206 self._client.query(
207 q.create(
208 q.collection(table_name),
209 {"data": data},
210 )
211 )
212 else:
213 for value in values:
214 data = {f: v for f, v in zip(fields, value)}
215 self._client.query(
216 q.create(
217 q.collection(table_name),
218 {"data": data},
219 )
220 )
222 def create_table(self, table_name: str, if_not_exists=True) -> Response:
223 """
224 Create a collection with the given name in the FaunaDB database.
225 """
226 fauna_query = q.create_collection({"name": table_name})
227 self._client.query(fauna_query)
229 def delete_document(self, table_name: str, conditions: List[str]) -> Response:
230 """
231 Delete a document with the given id in the FaunaDB database.
232 """
233 # get the id of the document (only = operator supported right now, can add more)
234 ref = conditions.args[1].value
235 fauna_query = q.delete(q.ref(q.collection(table_name), ref))
236 self._client.query(fauna_query)
238 def drop_table(self, table_name: str, if_exists=True) -> Response:
239 """
240 Delete a collection from the FaunaDB database.
241 """
242 fauna_query = q.delete(q.collection(table_name))
243 self._client.query(fauna_query)
244 return Response(resp_type=RESPONSE_TYPE.OK)
246 def get_tables(self) -> Response:
247 """
248 Get the list of collections in the FaunaDB database.
249 """
250 try:
251 result = self._client.query(q.paginate(q.collections()))
252 collections = []
253 for collection in result["data"]:
254 collections.append(collection.id())
255 return Response(
256 resp_type=RESPONSE_TYPE.TABLE,
257 data_frame=pd.DataFrame(
258 collections,
259 columns=["table_name"],
260 ),
261 )
262 except Exception as e:
263 logger.error(f"Error getting tables from FaunaDB: {e}")
264 return Response(
265 resp_type=RESPONSE_TYPE.ERROR,
266 error_message=f"Error getting tables from FaunaDB: {e}",
267 )