Coverage for mindsdb / integrations / handlers / mongodb_handler / utils / mongodb_parser.py: 5%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import ast as py_ast
3import dateutil.parser
4from bson import ObjectId
6from .mongodb_query import MongoQuery
9class MongodbParser:
10 """
11 Converts string into MongoQuery
12 """
14 def from_string(self, call_str):
15 tree = py_ast.parse(call_str.strip(), mode="eval")
16 calls = self.process(tree.body)
17 # first call contents collection
18 method1 = calls[0]["method"]
19 if len(method1) < 2:
20 raise IndexError("Collection not found")
21 collection = method1[-2]
23 mquery = MongoQuery(collection)
25 # keep only last name
26 calls[0]["method"] = [method1[-1]]
28 # convert method names: get first item of list
29 for c in calls:
30 mquery.add_step({"method": c["method"][0], "args": c["args"]})
32 return mquery
34 def process(self, node):
35 if isinstance(node, py_ast.Call):
36 previous_call = None
38 args = []
39 for node2 in node.args:
40 args.append(self.process(node2))
42 # check functions
43 if isinstance(node.func, py_ast.Name):
44 # it is just name
45 func = node.func.id
47 # special functions:
48 if func == "ISODate":
49 return dateutil.parser.isoparse(args[0])
50 if func == "ObjectId":
51 return ObjectId(args[0])
52 elif isinstance(node.func, py_ast.Attribute):
53 # it can be an attribute or pipeline
54 previous_call, func = self.process_func_name(node.func)
55 else:
56 raise NotImplementedError(f"Unknown function type: {node.func}")
58 call = [{"method": func, "args": args}]
59 if previous_call is not None:
60 call = previous_call + call
62 return call
64 if isinstance(node, py_ast.List):
65 elements = []
66 for node2 in node.elts:
67 elements.append(self.process(node2))
68 return elements
70 if isinstance(node, py_ast.Dict):
71 keys = []
72 for node2 in node.keys:
73 if isinstance(node2, py_ast.Constant):
74 value = node2.value
75 elif isinstance(node2, py_ast.Str): # py37
76 value = node2.s
77 elif isinstance(node2, py_ast.Name):
78 value = node2.id
79 else:
80 raise NotImplementedError(f"Unknown dict key {node2}")
82 keys.append(value)
84 values = []
85 for node2 in node.values:
86 values.append(self.process(node2))
88 return dict(zip(keys, values))
90 if isinstance(node, py_ast.Name):
91 # special attributes
92 name = node.id
93 if name == "true":
94 return True
95 elif name == "false":
96 return False
97 elif name == "null":
98 return None
100 if isinstance(node, py_ast.Constant):
101 return node.value
103 # ---- python 3.7 objects -----
104 if isinstance(node, py_ast.Str):
105 return node.s
107 if isinstance(node, py_ast.Num):
108 return node.n
110 # -----------------------------
112 if isinstance(node, py_ast.UnaryOp):
113 if isinstance(node.op, py_ast.USub):
114 value = self.process(node.operand)
115 return -value
117 raise NotImplementedError(f"Unknown node {node}")
119 def process_func_name(self, node):
120 previous_call = None
121 if isinstance(node, py_ast.Attribute):
122 attribute = node
123 # multilevel attribute
125 obj_name = []
126 while isinstance(attribute, py_ast.Attribute):
127 obj_name.insert(0, attribute.attr)
128 attribute = attribute.value
130 if isinstance(attribute, py_ast.Name):
131 obj_name.insert(0, attribute.id)
133 if isinstance(attribute, py_ast.Call):
134 # is pipeline
135 previous_call = self.process(attribute)
137 return previous_call, obj_name