Coverage for mindsdb / integrations / handlers / mongodb_handler / utils / mongodb_query.py: 42%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime as dt 

2import json 

3from bson import ObjectId 

4 

5 

6class MongoJSONEncoder(json.JSONEncoder): 

7 def default(self, obj): 

8 if isinstance(obj, dt.datetime): 

9 return f"ISODate({obj.isoformat()})" 

10 if isinstance(obj, ObjectId): 

11 return f"ObjectId({str(obj)})" 

12 return super(MongoJSONEncoder, self).default(obj) 

13 

14 

15class MongoQuery: 

16 def __init__(self, collection, pipline=None): 

17 self.collection = collection 

18 self.pipeline = [] 

19 

20 if pipline is None: 20 ↛ 22line 20 didn't jump to line 22 because the condition on line 20 was always true

21 pipline = [] 

22 for step in pipline: 22 ↛ 23line 22 didn't jump to line 23 because the loop on line 22 never started

23 self.add_step(step) 

24 

25 def add_step(self, step): 

26 # step = { 

27 # 'method': 'sort', 

28 # 'args': [{c: 3}] 

29 # } 

30 

31 if "method" not in step or "args" not in step or not isinstance(step["args"], list): 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true

32 raise AttributeError(f"Wrong step {step}") 

33 

34 self.pipeline.append(step) 

35 

36 def to_string(self): 

37 return self.__str__() 

38 

39 def __getattr__(self, item): 

40 # return callback to save step of pipeline 

41 def fnc(*args): 

42 self.pipeline.append({"method": item, "args": args}) 

43 

44 return fnc 

45 

46 def __str__(self): 

47 """ 

48 converts call to string 

49 

50 { 

51 'collection': 'fish', 

52 'call': [ // call is sequence of methods 

53 { 

54 'method': 'find', 

55 'args': [{a:1}, {b:2}] 

56 }, 

57 { 

58 'method': 'sort', 

59 'args': [{c:3}] 

60 }, 

61 ] 

62 } 

63 

64 to: 

65 

66 "db_test.fish.find({a:1}, {b:2}).sort({c:3})" 

67 """ 

68 

69 call_str = f"db.{self.collection}" 

70 for step in self.pipeline: 

71 args_str = [] 

72 for arg in step["args"]: 

73 args_str.append(MongoJSONEncoder().encode(arg)) 

74 call_str += f".{step['method']}({','.join(args_str)})" 

75 return call_str 

76 

77 def __repr__(self): 

78 return f"MongoQuery({self.collection}, {str(self.pipeline)})"