Coverage for mindsdb / integrations / handlers / mongodb_handler / utils / mongodb_query.py: 42%
38 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime as dt
2import json
3from bson import ObjectId
6class MongoJSONEncoder(json.JSONEncoder):
7 def default(self, obj):
8 if isinstance(obj, dt.datetime):
9 return f"ISODate({obj.isoformat()})"
10 if isinstance(obj, ObjectId):
11 return f"ObjectId({str(obj)})"
12 return super(MongoJSONEncoder, self).default(obj)
15class MongoQuery:
16 def __init__(self, collection, pipline=None):
17 self.collection = collection
18 self.pipeline = []
20 if pipline is None: 20 ↛ 22line 20 didn't jump to line 22 because the condition on line 20 was always true
21 pipline = []
22 for step in pipline: 22 ↛ 23line 22 didn't jump to line 23 because the loop on line 22 never started
23 self.add_step(step)
25 def add_step(self, step):
26 # step = {
27 # 'method': 'sort',
28 # 'args': [{c: 3}]
29 # }
31 if "method" not in step or "args" not in step or not isinstance(step["args"], list): 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true
32 raise AttributeError(f"Wrong step {step}")
34 self.pipeline.append(step)
36 def to_string(self):
37 return self.__str__()
39 def __getattr__(self, item):
40 # return callback to save step of pipeline
41 def fnc(*args):
42 self.pipeline.append({"method": item, "args": args})
44 return fnc
46 def __str__(self):
47 """
48 converts call to string
50 {
51 'collection': 'fish',
52 'call': [ // call is sequence of methods
53 {
54 'method': 'find',
55 'args': [{a:1}, {b:2}]
56 },
57 {
58 'method': 'sort',
59 'args': [{c:3}]
60 },
61 ]
62 }
64 to:
66 "db_test.fish.find({a:1}, {b:2}).sort({c:3})"
67 """
69 call_str = f"db.{self.collection}"
70 for step in self.pipeline:
71 args_str = []
72 for arg in step["args"]:
73 args_str.append(MongoJSONEncoder().encode(arg))
74 call_str += f".{step['method']}({','.join(args_str)})"
75 return call_str
77 def __repr__(self):
78 return f"MongoQuery({self.collection}, {str(self.pipeline)})"