Coverage for mindsdb / integrations / handlers / file_handler / file_handler.py: 83%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import shutil 

3import tempfile 

4 

5import pandas as pd 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb_sql_parser.ast import CreateTable, DropTables, Insert, Select, Identifier 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.api.executor.utilities.sql import query_dfs 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import RESPONSE_TYPE 

13from mindsdb.integrations.libs.response import HandlerResponse as Response 

14from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

15from mindsdb.utilities import log 

16 

17 

18logger = log.getLogger(__name__) 

19 

20DEFAULT_CHUNK_SIZE = 500 

21DEFAULT_CHUNK_OVERLAP = 250 

22 

23 

24def clean_cell(val): 

25 if str(val) in ["", " ", " ", "NaN", "nan", "NA"]: 

26 return None 

27 return val 

28 

29 

30class FileHandler(DatabaseHandler): 

31 """ 

32 Handler for files 

33 """ 

34 

35 name = "files" 

36 

37 def __init__( 

38 self, 

39 name=None, 

40 file_storage=None, 

41 connection_data={}, 

42 file_controller=None, 

43 **kwargs, 

44 ): 

45 super().__init__(name) 

46 self.parser = parse_sql 

47 self.fs_store = file_storage 

48 self.custom_parser = connection_data.get("custom_parser", None) 

49 self.clean_rows = connection_data.get("clean_rows", True) 

50 self.chunk_size = connection_data.get("chunk_size", DEFAULT_CHUNK_SIZE) 

51 self.chunk_overlap = connection_data.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP) 

52 self.file_controller = file_controller 

53 self.cache_thread_safe = True 

54 

55 def connect(self, **kwargs): 

56 return 

57 

58 def disconnect(self, **kwargs): 

59 return 

60 

61 def check_connection(self) -> StatusResponse: 

62 return StatusResponse(True) 

63 

64 def _get_table_page_names(self, table: Identifier): 

65 table_name_parts = table.parts 

66 

67 # Check if it's a multi-part name (e.g., `file_name.sheet_name`) 

68 if len(table_name_parts) > 1: 

69 table_name = table_name_parts[-2] 

70 page_name = table_name_parts[-1] # Get the sheet name 

71 else: 

72 table_name = table_name_parts[-1] 

73 page_name = None 

74 return table_name, page_name 

75 

76 def query(self, query: ASTNode) -> Response: 

77 if type(query) is DropTables: 

78 for table_identifier in query.tables: 

79 if len(table_identifier.parts) == 2 and table_identifier.parts[0] != self.name: 

80 return Response( 

81 RESPONSE_TYPE.ERROR, 

82 error_message=f"Can't delete table from database '{table_identifier.parts[0]}'", 

83 ) 

84 table_name = table_identifier.parts[-1] 

85 try: 

86 self.file_controller.delete_file(table_name) 

87 except FileNotFoundError as e: 

88 if not query.if_exists: 

89 return Response( 

90 RESPONSE_TYPE.ERROR, 

91 error_message=f"Can't delete table '{table_name}': {e}", 

92 ) 

93 except Exception as e: 

94 return Response( 

95 RESPONSE_TYPE.ERROR, 

96 error_message=f"Can't delete table '{table_name}': {e}", 

97 ) 

98 return Response(RESPONSE_TYPE.OK) 

99 

100 if isinstance(query, CreateTable): 

101 # Check if the table already exists or if the table name contains more than one namespace 

102 existing_files = self.file_controller.get_files_names() 

103 

104 if len(query.name.parts) != 1: 

105 return Response( 

106 RESPONSE_TYPE.ERROR, 

107 error_message="Table name cannot contain more than one namespace", 

108 ) 

109 

110 table_name = query.name.parts[-1] 

111 if table_name in existing_files: 

112 if query.is_replace: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 self.file_controller.delete_file(table_name) 

114 else: 

115 return Response( 

116 RESPONSE_TYPE.ERROR, 

117 error_message=f"Table '{table_name}' already exists", 

118 ) 

119 

120 temp_dir_path = tempfile.mkdtemp(prefix="mindsdb_file_") 

121 

122 try: 

123 # Create a temp file to save the table 

124 temp_file_path = os.path.join(temp_dir_path, f"{table_name}.csv") 

125 

126 # Create an empty file using with the columns in the query 

127 df = pd.DataFrame(columns=[col.name for col in query.columns]) 

128 df.to_csv(temp_file_path, index=False) 

129 

130 self.file_controller.save_file(table_name, temp_file_path, file_name=f"{table_name}.csv") 

131 except Exception as unknown_error: 

132 return Response( 

133 RESPONSE_TYPE.ERROR, 

134 error_message=f"Error creating table '{table_name}': {unknown_error}", 

135 ) 

136 finally: 

137 # Remove the temp dir created 

138 shutil.rmtree(temp_dir_path, ignore_errors=True) 

139 

140 return Response(RESPONSE_TYPE.OK) 

141 

142 elif isinstance(query, Select): 

143 from mindsdb.integrations.utilities.query_traversal import query_traversal 

144 

145 tables = {} 

146 

147 def find_tables(node, is_table, **args): 

148 if is_table and isinstance(node, Identifier): 

149 table_name, page_name = self._get_table_page_names(node) 

150 try: 

151 df = self.file_controller.get_file_data(table_name, page_name) 

152 except FileNotFoundError: 

153 return 

154 

155 if page_name is not None: 

156 table_name = f"{page_name}_{table_name}" 

157 node.parts = [table_name] 

158 tables[table_name] = df 

159 

160 query_traversal(query, find_tables) 

161 

162 if len(tables) == 0: 162 ↛ 166line 162 didn't jump to line 166 because the condition on line 162 was always true

163 raise RuntimeError(f"No tables in query: {query}") 

164 

165 # Process the SELECT query 

166 result_df = query_dfs(tables, query) 

167 return Response(RESPONSE_TYPE.TABLE, data_frame=result_df) 

168 

169 elif isinstance(query, Insert): 

170 table_name, page_name = self._get_table_page_names(query.table) 

171 

172 df = self.file_controller.get_file_data(table_name, page_name) 

173 

174 # Create a new dataframe with the values from the query 

175 new_df = pd.DataFrame(query.values, columns=[col.name for col in query.columns]) 

176 

177 # Concatenate the new dataframe with the existing one 

178 df = pd.concat([df, new_df], ignore_index=True) 

179 

180 self.file_controller.set_file_data(table_name, df, page_name=page_name) 

181 

182 return Response(RESPONSE_TYPE.OK) 

183 

184 else: 

185 return Response( 

186 RESPONSE_TYPE.ERROR, 

187 error_message="Only 'select', 'insert', 'create' and 'drop' queries allowed for files", 

188 ) 

189 

190 def native_query(self, query: str) -> Response: 

191 ast = self.parser(query) 

192 return self.query(ast) 

193 

194 def get_tables(self) -> Response: 

195 """ 

196 List all files 

197 """ 

198 files_meta = self.file_controller.get_files() 

199 data = [ 

200 { 

201 "TABLE_NAME": x["name"], 

202 "TABLE_ROWS": x["row_count"], 

203 "TABLE_TYPE": "BASE TABLE", 

204 } 

205 for x in files_meta 

206 ] 

207 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data)) 

208 

209 def get_columns(self, table_name) -> Response: 

210 file_meta = self.file_controller.get_file_meta(table_name) 

211 result = Response( 

212 RESPONSE_TYPE.TABLE, 

213 data_frame=pd.DataFrame( 

214 [ 

215 { 

216 "Field": x["name"].strip() if isinstance(x, dict) else x.strip(), 

217 "Type": "str", 

218 } 

219 for x in file_meta["columns"] 

220 ] 

221 ), 

222 ) 

223 return result