Coverage for mindsdb / integrations / handlers / openstreetmap_handler / openstreetmap_tables.py: 0%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from typing import Text, List, Dict 

3 

4from mindsdb_sql_parser import ast 

5from mindsdb.integrations.libs.api_handler import APITable 

6 

7from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

8 

9from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

10 

11 

12class OpenStreetMapNodeTable(APITable): 

13 """The OpenStreetMap Nodes Table implementation""" 

14 

15 def select(self, query: ast.Select) -> pd.DataFrame: 

16 """Pulls data from the OpenStreetMap API endpoint. 

17 

18 Parameters 

19 ---------- 

20 query : ast.Select 

21 Given SQL SELECT query 

22 

23 Returns 

24 ------- 

25 pd.DataFrame 

26 OpenStreetMap data matching the query 

27 

28 Raises 

29 ------ 

30 ValueError 

31 If the query contains an unsupported condition 

32 """ 

33 

34 where_conditions = extract_comparison_conditions(query.where) 

35 

36 if query.limit: 

37 result_limit = query.limit.value 

38 else: 

39 result_limit = 20 

40 

41 nodes_df = pd.json_normalize(self.get_nodes(where_conditions=where_conditions, limit=result_limit)) 

42 

43 selected_columns = [] 

44 for target in query.targets: 

45 if isinstance(target, ast.Star): 

46 selected_columns = nodes_df.columns 

47 break 

48 elif isinstance(target, ast.Identifier): 

49 selected_columns.append(target.parts[-1]) 

50 else: 

51 raise ValueError(f"Unknown query target {type(target)}") 

52 

53 order_by_conditions = {} 

54 if query.order_by and len(query.order_by) > 0: 

55 order_by_conditions["columns"] = [] 

56 order_by_conditions["ascending"] = [] 

57 

58 for an_order in query.order_by: 

59 if an_order.field.parts[0] == 'nodes': 

60 if an_order.field.parts[1] in nodes_df.columns: 

61 order_by_conditions["columns"].append(an_order.field.parts[1]) 

62 

63 if an_order.direction == "ASC": 

64 order_by_conditions["ascending"].append(True) 

65 else: 

66 order_by_conditions["ascending"].append(False) 

67 else: 

68 raise ValueError( 

69 f"Order by unknown column {an_order.field.parts[1]}" 

70 ) 

71 

72 select_statement_executor = SELECTQueryExecutor( 

73 nodes_df, 

74 selected_columns, 

75 [], 

76 order_by_conditions 

77 ) 

78 nodes_df = select_statement_executor.execute_query() 

79 

80 return nodes_df 

81 

82 def get_nodes(self, **kwargs) -> List[Dict]: 

83 where_conditions = kwargs.get('where_conditions', None) 

84 

85 area, tags = None, {} 

86 min_lat, min_lon, max_lat, max_lon = None, None, None, None 

87 if where_conditions: 

88 for condition in where_conditions: 

89 if condition[1] == 'area': 

90 area = condition[2] 

91 

92 elif condition[1] == 'min_lat': 

93 min_lat = condition[2] 

94 

95 elif condition[1] == 'min_lon': 

96 min_lon = condition[2] 

97 

98 elif condition[1] == 'max_lat': 

99 max_lat = condition[2] 

100 

101 elif condition[1] == 'max_lon': 

102 max_lon = condition[2] 

103 

104 else: 

105 tags[condition[1]] = condition[2] 

106 

107 result = self.execute_osm_node_query( 

108 tags=tags, 

109 area=area, 

110 min_lat=min_lat, 

111 min_lon=min_lon, 

112 max_lat=max_lat, 

113 max_lon=max_lon, 

114 limit=kwargs.get('limit', None) 

115 ) 

116 

117 nodes = [] 

118 for node in result.nodes: 

119 node_dict = { 

120 "id": node.id, 

121 "lat": node.lat, 

122 "lon": node.lon, 

123 "tags": node.tags 

124 } 

125 nodes.append(node_dict) 

126 return nodes 

127 

128 def execute_osm_node_query(self, tags, area=None, min_lat=None, min_lon=None, max_lat=None, max_lon=None, limit=None): 

129 query_template = """ 

130 [out:json]; 

131 {area_clause} 

132 node{area_node_clause}{tags_clause}{bbox}; 

133 out {limit}; 

134 """ 

135 

136 tags_clause = "" 

137 if tags: 

138 for tag_key, tag_value in tags.items(): 

139 tags_clause += '["{}"="{}"]'.format(tag_key, tag_value) 

140 

141 area_clause, area_node_clause = "", "" 

142 if area: 

143 area_clause = 'area[name="{}"]->.city;\n'.format(area) 

144 area_node_clause = "(area.city)" 

145 

146 bbox_clause = "" 

147 if min_lat or min_lon or max_lat or max_lon: 

148 bbox_clause = "{},{},{},{}".format(min_lat, min_lon, max_lat, max_lon) 

149 

150 limit_clause = limit if limit else "" 

151 

152 query = query_template.format( 

153 area_clause=area_clause, 

154 area_node_clause=area_node_clause, 

155 tags_clause=tags_clause, 

156 bbox=bbox_clause, 

157 limit=limit_clause 

158 ) 

159 

160 api = self.handler.connect() 

161 

162 result = api.query(query) 

163 return result 

164 

165 

166class OpenStreetMapWayTable(APITable): 

167 """The OpenStreetMap Ways Table implementation""" 

168 

169 def select(self, query: ast.Select) -> pd.DataFrame: 

170 

171 select_statement_parser = SELECTQueryParser( 

172 query, 

173 'ways', 

174 self.get_columns() 

175 ) 

176 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

177 

178 ways_df = pd.json_normalize(self.get_ways(limit=result_limit)) 

179 

180 select_statement_executor = SELECTQueryExecutor( 

181 ways_df, 

182 selected_columns, 

183 where_conditions, 

184 order_by_conditions 

185 ) 

186 ways_df = select_statement_executor.execute_query() 

187 

188 return ways_df 

189 

190 def get_columns(self) -> List[Text]: 

191 return pd.json_normalize(self.get_ways(limit=1)).columns.tolist() 

192 

193 def get_ways(self, **kwargs) -> List[Dict]: 

194 

195 api_session = self.handler.connect() 

196 ways = api_session.query(""" 

197 way 

198 ({{bbox}}); 

199 out; 

200 """, 

201 # bbox=self.connection_data['bbox'] 

202 ) 

203 return [way.to_dict() for way in ways.ways] 

204 

205 

206class OpenStreetMapRelationTable(APITable): 

207 """The OpenStreetMap Relations Table implementation""" 

208 

209 def select_relations(self, query: ast.Select) -> pd.DataFrame: 

210 

211 select_statement_parser = SELECTQueryParser( 

212 query, 

213 'relations', 

214 self.get_columns() 

215 ) 

216 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

217 

218 relations_df = pd.json_normalize(self.get_relations(limit=result_limit)) 

219 

220 select_statement_executor = SELECTQueryExecutor( 

221 relations_df, 

222 selected_columns, 

223 where_conditions, 

224 order_by_conditions 

225 ) 

226 relations_df = select_statement_executor.execute_query() 

227 

228 return relations_df 

229 

230 def get_columns(self) -> List[Text]: 

231 return pd.json_normalize(self.get_relations(limit=1)).columns.tolist() 

232 

233 def get_relations(self, **kwargs) -> List[Dict]: 

234 

235 api_session = self.handler.connect() 

236 relations = api_session.query(""" 

237 relation 

238 ({{bbox}}); 

239 out; 

240 """, 

241 # bbox=self.connection_data['bbox'] 

242 ) 

243 return [relation.to_dict() for relation in relations.relations]