Coverage for mindsdb / integrations / handlers / mongodb_handler / utils / mongodb_parser.py: 5%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import ast as py_ast 

2 

3import dateutil.parser 

4from bson import ObjectId 

5 

6from .mongodb_query import MongoQuery 

7 

8 

9class MongodbParser: 

10 """ 

11 Converts string into MongoQuery 

12 """ 

13 

14 def from_string(self, call_str): 

15 tree = py_ast.parse(call_str.strip(), mode="eval") 

16 calls = self.process(tree.body) 

17 # first call contents collection 

18 method1 = calls[0]["method"] 

19 if len(method1) < 2: 

20 raise IndexError("Collection not found") 

21 collection = method1[-2] 

22 

23 mquery = MongoQuery(collection) 

24 

25 # keep only last name 

26 calls[0]["method"] = [method1[-1]] 

27 

28 # convert method names: get first item of list 

29 for c in calls: 

30 mquery.add_step({"method": c["method"][0], "args": c["args"]}) 

31 

32 return mquery 

33 

34 def process(self, node): 

35 if isinstance(node, py_ast.Call): 

36 previous_call = None 

37 

38 args = [] 

39 for node2 in node.args: 

40 args.append(self.process(node2)) 

41 

42 # check functions 

43 if isinstance(node.func, py_ast.Name): 

44 # it is just name 

45 func = node.func.id 

46 

47 # special functions: 

48 if func == "ISODate": 

49 return dateutil.parser.isoparse(args[0]) 

50 if func == "ObjectId": 

51 return ObjectId(args[0]) 

52 elif isinstance(node.func, py_ast.Attribute): 

53 # it can be an attribute or pipeline 

54 previous_call, func = self.process_func_name(node.func) 

55 else: 

56 raise NotImplementedError(f"Unknown function type: {node.func}") 

57 

58 call = [{"method": func, "args": args}] 

59 if previous_call is not None: 

60 call = previous_call + call 

61 

62 return call 

63 

64 if isinstance(node, py_ast.List): 

65 elements = [] 

66 for node2 in node.elts: 

67 elements.append(self.process(node2)) 

68 return elements 

69 

70 if isinstance(node, py_ast.Dict): 

71 keys = [] 

72 for node2 in node.keys: 

73 if isinstance(node2, py_ast.Constant): 

74 value = node2.value 

75 elif isinstance(node2, py_ast.Str): # py37 

76 value = node2.s 

77 elif isinstance(node2, py_ast.Name): 

78 value = node2.id 

79 else: 

80 raise NotImplementedError(f"Unknown dict key {node2}") 

81 

82 keys.append(value) 

83 

84 values = [] 

85 for node2 in node.values: 

86 values.append(self.process(node2)) 

87 

88 return dict(zip(keys, values)) 

89 

90 if isinstance(node, py_ast.Name): 

91 # special attributes 

92 name = node.id 

93 if name == "true": 

94 return True 

95 elif name == "false": 

96 return False 

97 elif name == "null": 

98 return None 

99 

100 if isinstance(node, py_ast.Constant): 

101 return node.value 

102 

103 # ---- python 3.7 objects ----- 

104 if isinstance(node, py_ast.Str): 

105 return node.s 

106 

107 if isinstance(node, py_ast.Num): 

108 return node.n 

109 

110 # ----------------------------- 

111 

112 if isinstance(node, py_ast.UnaryOp): 

113 if isinstance(node.op, py_ast.USub): 

114 value = self.process(node.operand) 

115 return -value 

116 

117 raise NotImplementedError(f"Unknown node {node}") 

118 

119 def process_func_name(self, node): 

120 previous_call = None 

121 if isinstance(node, py_ast.Attribute): 

122 attribute = node 

123 # multilevel attribute 

124 

125 obj_name = [] 

126 while isinstance(attribute, py_ast.Attribute): 

127 obj_name.insert(0, attribute.attr) 

128 attribute = attribute.value 

129 

130 if isinstance(attribute, py_ast.Name): 

131 obj_name.insert(0, attribute.id) 

132 

133 if isinstance(attribute, py_ast.Call): 

134 # is pipeline 

135 previous_call = self.process(attribute) 

136 

137 return previous_call, obj_name