Coverage for mindsdb / integrations / handlers / openstreetmap_handler / openstreetmap_tables.py: 0%
105 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from typing import Text, List, Dict
4from mindsdb_sql_parser import ast
5from mindsdb.integrations.libs.api_handler import APITable
7from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
9from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor
12class OpenStreetMapNodeTable(APITable):
13 """The OpenStreetMap Nodes Table implementation"""
15 def select(self, query: ast.Select) -> pd.DataFrame:
16 """Pulls data from the OpenStreetMap API endpoint.
18 Parameters
19 ----------
20 query : ast.Select
21 Given SQL SELECT query
23 Returns
24 -------
25 pd.DataFrame
26 OpenStreetMap data matching the query
28 Raises
29 ------
30 ValueError
31 If the query contains an unsupported condition
32 """
34 where_conditions = extract_comparison_conditions(query.where)
36 if query.limit:
37 result_limit = query.limit.value
38 else:
39 result_limit = 20
41 nodes_df = pd.json_normalize(self.get_nodes(where_conditions=where_conditions, limit=result_limit))
43 selected_columns = []
44 for target in query.targets:
45 if isinstance(target, ast.Star):
46 selected_columns = nodes_df.columns
47 break
48 elif isinstance(target, ast.Identifier):
49 selected_columns.append(target.parts[-1])
50 else:
51 raise ValueError(f"Unknown query target {type(target)}")
53 order_by_conditions = {}
54 if query.order_by and len(query.order_by) > 0:
55 order_by_conditions["columns"] = []
56 order_by_conditions["ascending"] = []
58 for an_order in query.order_by:
59 if an_order.field.parts[0] == 'nodes':
60 if an_order.field.parts[1] in nodes_df.columns:
61 order_by_conditions["columns"].append(an_order.field.parts[1])
63 if an_order.direction == "ASC":
64 order_by_conditions["ascending"].append(True)
65 else:
66 order_by_conditions["ascending"].append(False)
67 else:
68 raise ValueError(
69 f"Order by unknown column {an_order.field.parts[1]}"
70 )
72 select_statement_executor = SELECTQueryExecutor(
73 nodes_df,
74 selected_columns,
75 [],
76 order_by_conditions
77 )
78 nodes_df = select_statement_executor.execute_query()
80 return nodes_df
82 def get_nodes(self, **kwargs) -> List[Dict]:
83 where_conditions = kwargs.get('where_conditions', None)
85 area, tags = None, {}
86 min_lat, min_lon, max_lat, max_lon = None, None, None, None
87 if where_conditions:
88 for condition in where_conditions:
89 if condition[1] == 'area':
90 area = condition[2]
92 elif condition[1] == 'min_lat':
93 min_lat = condition[2]
95 elif condition[1] == 'min_lon':
96 min_lon = condition[2]
98 elif condition[1] == 'max_lat':
99 max_lat = condition[2]
101 elif condition[1] == 'max_lon':
102 max_lon = condition[2]
104 else:
105 tags[condition[1]] = condition[2]
107 result = self.execute_osm_node_query(
108 tags=tags,
109 area=area,
110 min_lat=min_lat,
111 min_lon=min_lon,
112 max_lat=max_lat,
113 max_lon=max_lon,
114 limit=kwargs.get('limit', None)
115 )
117 nodes = []
118 for node in result.nodes:
119 node_dict = {
120 "id": node.id,
121 "lat": node.lat,
122 "lon": node.lon,
123 "tags": node.tags
124 }
125 nodes.append(node_dict)
126 return nodes
128 def execute_osm_node_query(self, tags, area=None, min_lat=None, min_lon=None, max_lat=None, max_lon=None, limit=None):
129 query_template = """
130 [out:json];
131 {area_clause}
132 node{area_node_clause}{tags_clause}{bbox};
133 out {limit};
134 """
136 tags_clause = ""
137 if tags:
138 for tag_key, tag_value in tags.items():
139 tags_clause += '["{}"="{}"]'.format(tag_key, tag_value)
141 area_clause, area_node_clause = "", ""
142 if area:
143 area_clause = 'area[name="{}"]->.city;\n'.format(area)
144 area_node_clause = "(area.city)"
146 bbox_clause = ""
147 if min_lat or min_lon or max_lat or max_lon:
148 bbox_clause = "{},{},{},{}".format(min_lat, min_lon, max_lat, max_lon)
150 limit_clause = limit if limit else ""
152 query = query_template.format(
153 area_clause=area_clause,
154 area_node_clause=area_node_clause,
155 tags_clause=tags_clause,
156 bbox=bbox_clause,
157 limit=limit_clause
158 )
160 api = self.handler.connect()
162 result = api.query(query)
163 return result
166class OpenStreetMapWayTable(APITable):
167 """The OpenStreetMap Ways Table implementation"""
169 def select(self, query: ast.Select) -> pd.DataFrame:
171 select_statement_parser = SELECTQueryParser(
172 query,
173 'ways',
174 self.get_columns()
175 )
176 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
178 ways_df = pd.json_normalize(self.get_ways(limit=result_limit))
180 select_statement_executor = SELECTQueryExecutor(
181 ways_df,
182 selected_columns,
183 where_conditions,
184 order_by_conditions
185 )
186 ways_df = select_statement_executor.execute_query()
188 return ways_df
190 def get_columns(self) -> List[Text]:
191 return pd.json_normalize(self.get_ways(limit=1)).columns.tolist()
193 def get_ways(self, **kwargs) -> List[Dict]:
195 api_session = self.handler.connect()
196 ways = api_session.query("""
197 way
198 ({{bbox}});
199 out;
200 """,
201 # bbox=self.connection_data['bbox']
202 )
203 return [way.to_dict() for way in ways.ways]
206class OpenStreetMapRelationTable(APITable):
207 """The OpenStreetMap Relations Table implementation"""
209 def select_relations(self, query: ast.Select) -> pd.DataFrame:
211 select_statement_parser = SELECTQueryParser(
212 query,
213 'relations',
214 self.get_columns()
215 )
216 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
218 relations_df = pd.json_normalize(self.get_relations(limit=result_limit))
220 select_statement_executor = SELECTQueryExecutor(
221 relations_df,
222 selected_columns,
223 where_conditions,
224 order_by_conditions
225 )
226 relations_df = select_statement_executor.execute_query()
228 return relations_df
230 def get_columns(self) -> List[Text]:
231 return pd.json_normalize(self.get_relations(limit=1)).columns.tolist()
233 def get_relations(self, **kwargs) -> List[Dict]:
235 api_session = self.handler.connect()
236 relations = api_session.query("""
237 relation
238 ({{bbox}});
239 out;
240 """,
241 # bbox=self.connection_data['bbox']
242 )
243 return [relation.to_dict() for relation in relations.relations]