Coverage for mindsdb / interfaces / functions / to_markdown.py: 0%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from io import BytesIO 

2import os 

3from typing import Union 

4from urllib.parse import urlparse 

5import xml.etree.ElementTree as ET 

6 

7from aipdf import ocr 

8import mimetypes 

9import requests 

10 

11 

12class ToMarkdown: 

13 """ 

14 Extracts the content of documents of various formats in markdown format. 

15 """ 

16 

17 def __init__(self): 

18 """ 

19 Initializes the ToMarkdown class. 

20 """ 

21 

22 def call(self, file_path_or_url: str, **kwargs) -> str: 

23 """ 

24 Converts a file to markdown. 

25 """ 

26 file_extension = self._get_file_extension(file_path_or_url) 

27 file_content = self._get_file_content(file_path_or_url) 

28 

29 if file_extension == ".pdf": 

30 return self._pdf_to_markdown(file_content, **kwargs) 

31 

32 elif file_extension in (".xml", ".nessus"): 

33 return self._xml_to_markdown(file_content, **kwargs) 

34 

35 else: 

36 raise ValueError(f"Unsupported file type: {file_extension}.") 

37 

38 def _get_file_content(self, file_path_or_url: str) -> BytesIO: 

39 """ 

40 Retrieves the content of a file. 

41 """ 

42 parsed_url = urlparse(file_path_or_url) 

43 if parsed_url.scheme in ("http", "https"): 

44 response = requests.get(file_path_or_url) 

45 if response.status_code == 200: 

46 return BytesIO(response.content) 

47 else: 

48 raise RuntimeError(f"Unable to retrieve file from URL: {file_path_or_url}") 

49 else: 

50 with open(file_path_or_url, "rb") as file: 

51 return BytesIO(file.read()) 

52 

53 def _get_file_extension(self, file_path_or_url: str) -> str: 

54 """ 

55 Retrieves the file extension from a file path or URL. 

56 """ 

57 parsed_url = urlparse(file_path_or_url) 

58 if parsed_url.scheme in ("http", "https"): 

59 try: 

60 # Make a HEAD request to get headers without downloading the file. 

61 response = requests.head(file_path_or_url, allow_redirects=True) 

62 content_type = response.headers.get("Content-Type", "") 

63 if content_type: 

64 ext = mimetypes.guess_extension(content_type.split(";")[0].strip()) 

65 if ext: 

66 return ext 

67 

68 # Fallback to extracting extension from the URL path 

69 ext = os.path.splitext(parsed_url.path)[1] 

70 if ext: 

71 return ext 

72 except requests.RequestException as e: 

73 raise RuntimeError(f"Unable to retrieve file extension from URL: {file_path_or_url}") from e 

74 else: 

75 return os.path.splitext(file_path_or_url)[1] 

76 

77 def _pdf_to_markdown(self, file_content: Union[requests.Response, BytesIO], **kwargs) -> str: 

78 """ 

79 Converts a PDF file to markdown. 

80 """ 

81 markdown_pages = ocr(file_content, **kwargs) 

82 return "\n\n---\n\n".join(markdown_pages) 

83 

84 def _xml_to_markdown(self, file_content: Union[requests.Response, BytesIO], **kwargs) -> str: 

85 """ 

86 Converts an XML (or Nessus) file to markdown. 

87 """ 

88 

89 def parse_element(element: ET.Element, depth: int = 0) -> str: 

90 """ 

91 Recursively parses an XML element and converts it to markdown. 

92 """ 

93 markdown = [] 

94 heading = "#" * (depth + 1) 

95 

96 markdown.append(f"{heading} {element.tag}") 

97 

98 for key, val in element.attrib.items(): 

99 markdown.append(f"- **{key}**: {val}") 

100 

101 text = (element.text or "").strip() 

102 if text: 

103 markdown.append(f"\n{text}\n") 

104 

105 for child in element: 

106 markdown.append(parse_element(child, depth + 1)) 

107 

108 return "\n".join(markdown) 

109 

110 root = ET.fromstring(file_content.read().decode("utf-8")) 

111 markdown_content = parse_element(root) 

112 return markdown_content