Coverage for mindsdb / api / executor / utilities / functions.py: 23%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import urllib 

2import tempfile 

3from pathlib import Path 

4 

5import requests 

6 

7 

8# def get_column_in_case(columns, name): 

9# ''' 

10# ''' 

11# candidates = [] 

12# name_lower = name.lower() 

13# for column in columns: 

14# if column.lower() == name_lower: 

15# candidates.append(column) 

16# if len(candidates) != 1: 

17# return None 

18# return candidates[0] 

19 

20 

21def download_file(url): 

22 try: 

23 parse_result = urllib.parse.urlparse(url) 

24 scheme = parse_result.scheme 

25 except ValueError: 

26 raise Exception(f"Invalid url: {url}") 

27 except Exception as e: 

28 raise Exception(f"URL parsing error: {e}") from e 

29 temp_dir = tempfile.mkdtemp(prefix="mindsdb_file_download_") 

30 if scheme == "": 

31 raise Exception(f"Unknown url schema: {url}") 

32 

33 response = requests.get(url) 

34 temp_file_path = Path(temp_dir).joinpath("file") 

35 with open(str(temp_file_path), "wb") as file: 

36 file.write(response.content) 

37 return str(temp_file_path)