Coverage for mindsdb / interfaces / functions / to_markdown.py: 0%
62 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from io import BytesIO
2import os
3from typing import Union
4from urllib.parse import urlparse
5import xml.etree.ElementTree as ET
7from aipdf import ocr
8import mimetypes
9import requests
12class ToMarkdown:
13 """
14 Extracts the content of documents of various formats in markdown format.
15 """
17 def __init__(self):
18 """
19 Initializes the ToMarkdown class.
20 """
22 def call(self, file_path_or_url: str, **kwargs) -> str:
23 """
24 Converts a file to markdown.
25 """
26 file_extension = self._get_file_extension(file_path_or_url)
27 file_content = self._get_file_content(file_path_or_url)
29 if file_extension == ".pdf":
30 return self._pdf_to_markdown(file_content, **kwargs)
32 elif file_extension in (".xml", ".nessus"):
33 return self._xml_to_markdown(file_content, **kwargs)
35 else:
36 raise ValueError(f"Unsupported file type: {file_extension}.")
38 def _get_file_content(self, file_path_or_url: str) -> BytesIO:
39 """
40 Retrieves the content of a file.
41 """
42 parsed_url = urlparse(file_path_or_url)
43 if parsed_url.scheme in ("http", "https"):
44 response = requests.get(file_path_or_url)
45 if response.status_code == 200:
46 return BytesIO(response.content)
47 else:
48 raise RuntimeError(f"Unable to retrieve file from URL: {file_path_or_url}")
49 else:
50 with open(file_path_or_url, "rb") as file:
51 return BytesIO(file.read())
53 def _get_file_extension(self, file_path_or_url: str) -> str:
54 """
55 Retrieves the file extension from a file path or URL.
56 """
57 parsed_url = urlparse(file_path_or_url)
58 if parsed_url.scheme in ("http", "https"):
59 try:
60 # Make a HEAD request to get headers without downloading the file.
61 response = requests.head(file_path_or_url, allow_redirects=True)
62 content_type = response.headers.get("Content-Type", "")
63 if content_type:
64 ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
65 if ext:
66 return ext
68 # Fallback to extracting extension from the URL path
69 ext = os.path.splitext(parsed_url.path)[1]
70 if ext:
71 return ext
72 except requests.RequestException as e:
73 raise RuntimeError(f"Unable to retrieve file extension from URL: {file_path_or_url}") from e
74 else:
75 return os.path.splitext(file_path_or_url)[1]
77 def _pdf_to_markdown(self, file_content: Union[requests.Response, BytesIO], **kwargs) -> str:
78 """
79 Converts a PDF file to markdown.
80 """
81 markdown_pages = ocr(file_content, **kwargs)
82 return "\n\n---\n\n".join(markdown_pages)
84 def _xml_to_markdown(self, file_content: Union[requests.Response, BytesIO], **kwargs) -> str:
85 """
86 Converts an XML (or Nessus) file to markdown.
87 """
89 def parse_element(element: ET.Element, depth: int = 0) -> str:
90 """
91 Recursively parses an XML element and converts it to markdown.
92 """
93 markdown = []
94 heading = "#" * (depth + 1)
96 markdown.append(f"{heading} {element.tag}")
98 for key, val in element.attrib.items():
99 markdown.append(f"- **{key}**: {val}")
101 text = (element.text or "").strip()
102 if text:
103 markdown.append(f"\n{text}\n")
105 for child in element:
106 markdown.append(parse_element(child, depth + 1))
108 return "\n".join(markdown)
110 root = ET.fromstring(file_content.read().decode("utf-8"))
111 markdown_content = parse_element(root)
112 return markdown_content