Coverage for mindsdb / integrations / handlers / web_handler / urlcrawl_helpers.py: 79%
164 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import concurrent.futures
2import io
3import re
4import traceback
5from threading import Lock
6from typing import List
7from urllib.parse import urljoin, urlparse, urlunparse
9import html2text
10import fitz # PyMuPDF
11import pandas as pd
12import requests
13from bs4 import BeautifulSoup
14from mindsdb.utilities import log
16logger = log.getLogger(__name__)
19def pdf_to_markdown(response, gap_threshold=10):
20 """
21 Convert a PDF document to Markdown text.
23 Args:
24 response: the response object containing the PDF data
25 gap_threshold (int): the vertical gap size that triggers a new line in the output (default 10)
27 Returns:
28 A string containing the converted Markdown text.
30 Raises:
31 Exception -- if the PDF data cannot be processed.
32 """
34 try:
35 file_stream = io.BytesIO(response.content)
36 document = fitz.open(stream=file_stream, filetype="pdf")
37 except Exception as e:
38 raise Exception("Failed to process PDF data: " + str(e))
40 markdown_lines = []
41 for page_num in range(len(document)):
42 page = document.load_page(page_num)
44 blocks = page.get_text("blocks")
46 blocks.sort(key=lambda block: (block[1], block[0]))
48 previous_block_bottom = 0
49 for block in blocks:
50 y0 = block[1]
51 y1 = block[3]
52 block_text = block[4]
54 # Check if there's a large vertical gap between this block and the previous one
55 if y0 - previous_block_bottom > gap_threshold: 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was always true
56 markdown_lines.append("")
58 markdown_lines.append(block_text)
59 previous_block_bottom = y1
61 markdown_lines.append("")
63 document.close()
65 return "\n".join(markdown_lines)
68def is_valid(url) -> bool:
69 """
70 Check if a URL is valid.
72 Args:
73 url: the URL to check
75 Returns:
76 bool: True if the URL is valid, False otherwise.
77 """
78 parsed = urlparse(url)
79 return bool(parsed.netloc) and bool(parsed.scheme)
82def parallel_get_all_website_links(urls) -> dict:
83 """
84 Fetch all website links from a list of URLs.
86 Args:
87 urls (list): a list of URLs to fetch links from
89 Returns:
90 A dictionary mapping each URL to a list of links found on that URL.
92 Raises:
93 Exception: if an error occurs while fetching links from a URL.
94 """
95 url_contents = {}
97 if len(urls) <= 10: 97 ↛ 102line 97 didn't jump to line 102 because the condition on line 97 was always true
98 for url in urls:
99 url_contents[url] = get_all_website_links(url)
100 return url_contents
102 with concurrent.futures.ProcessPoolExecutor() as executor:
103 future_to_url = {executor.submit(get_all_website_links, url): url for url in urls}
104 for future in concurrent.futures.as_completed(future_to_url):
105 url = future_to_url[future]
106 try:
107 url_contents[url] = future.result()
108 except Exception as exc:
109 logger.error(f"{url} generated an exception: {exc}")
110 # don't raise the exception, just log it, continue processing other urls
112 return url_contents
115def get_all_website_links(url, headers: dict = None) -> dict:
116 """
117 Fetch all website links from a URL.
119 Args:
120 url (str): the URL to fetch links from
121 headers (dict): a dictionary of headers to use when fetching links
123 Returns:
124 A dictionary containing the URL, the extracted links, the HTML content, the text content, and any error that occurred.
125 """
126 logger.info("rawling: {url} ...".format(url=url))
127 urls = set()
129 domain_name = urlparse(url).netloc
130 try:
131 session = requests.Session()
133 # Add headers to mimic a real browser request
134 if headers is None:
135 headers = {}
136 if "User-Agent" not in headers:
137 headers["User-Agent"] = (
138 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.3"
139 )
141 response = session.get(url, headers=headers)
142 if "cookie" in response.request.headers: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 session.cookies.update(response.cookies)
145 content_type = response.headers.get("Content-Type", "").lower()
147 if "application/pdf" in content_type: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 content_html = "PDF"
149 content_text = pdf_to_markdown(response)
150 else:
151 content_html = response.text
153 # Parse HTML content with BeautifulSoup
154 soup = BeautifulSoup(content_html, "html.parser")
155 content_text = get_readable_text_from_soup(soup)
156 for a_tag in soup.find_all("a"):
157 href = a_tag.attrs.get("href")
158 if href == "" or href is None: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 continue
160 href = urljoin(url, href)
161 parsed_href = urlparse(href)
162 href = urlunparse((parsed_href.scheme, parsed_href.netloc, parsed_href.path, "", "", ""))
163 if not is_valid(href): 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 continue
165 if href in urls: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 continue
167 if domain_name != parsed_href.netloc:
168 continue
170 href = href.rstrip("/")
171 urls.add(href)
173 except Exception:
174 error_message = traceback.format_exc().splitlines()[-1]
175 logger.exception("An exception occurred:")
176 return {
177 "url": url,
178 "urls": urls,
179 "html_content": "",
180 "text_content": "",
181 "error": str(error_message),
182 }
184 return {
185 "url": url,
186 "urls": urls,
187 "html_content": content_html,
188 "text_content": content_text,
189 "error": None,
190 }
193def get_readable_text_from_soup(soup) -> str:
194 """
195 Extract readable text from a BeautifulSoup object and convert it to Markdown.
197 Args:
198 soup (BeautifulSoup): a BeautifulSoup object
200 Returns:
201 The extracted text in Markdown format.
202 """
203 html_converter = html2text.HTML2Text()
204 html_converter.ignore_links = False
205 return html_converter.handle(str(soup))
208def get_all_website_links_recursively(
209 url,
210 reviewed_urls,
211 limit=None,
212 crawl_depth: int = 1,
213 current_depth: int = 0,
214 filters: List[str] = None,
215 headers=None,
216):
217 """
218 Recursively gathers all links from a given website up to a specified limit.
220 Args:
221 url (str): The starting URL to fetch links from.
222 reviewed_urls (dict): A dictionary to keep track of reviewed URLs and associated data.
223 limit (int, optional): The maximum number of URLs to process.
224 crawl_depth: How deep to crawl from each base URL. 0 = scrape given URLs only
225 current_depth: How deep we are currently crawling from the base URL.
226 filters (List[str]): Crawl URLs that only match these regex patterns.
228 TODO: Refactor this function to use a iterative aproach instead of recursion
229 """
230 if limit is not None:
231 if len(reviewed_urls) >= limit:
232 return reviewed_urls
234 if not filters: 234 ↛ 237line 234 didn't jump to line 237 because the condition on line 234 was always true
235 matches_filter = True
236 else:
237 matches_filter = any(re.match(f, url) is not None for f in filters)
238 if url not in reviewed_urls and matches_filter:
239 try:
240 reviewed_urls[url] = get_all_website_links(url, headers=headers)
241 except Exception:
242 error_message = traceback.format_exc().splitlines()[-1]
243 logger.exception("An exception occurred:")
244 reviewed_urls[url] = {
245 "url": url,
246 "urls": [],
247 "html_content": "",
248 "text_content": "",
249 "error": str(error_message),
250 }
252 if crawl_depth is not None and crawl_depth == current_depth:
253 return reviewed_urls
255 to_rev_url_list = []
257 # create a list of new urls to review that don't exist in the already reviewed ones
258 for new_url in reviewed_urls[url]["urls"]:
259 if not filters: 259 ↛ 262line 259 didn't jump to line 262 because the condition on line 259 was always true
260 matches_filter = True
261 else:
262 matches_filter = any(re.match(f, new_url) is not None for f in filters)
263 if not matches_filter: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 continue
265 # if this is already in the urls, then no need to go and crawl for it
266 if new_url in reviewed_urls or new_url in to_rev_url_list: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 continue
269 # insert immediately to count limit between threads. fill later
270 url_list_lock = Lock()
271 with url_list_lock:
272 if limit is None or len(reviewed_urls) < limit:
273 reviewed_urls[new_url] = {}
274 to_rev_url_list.append(new_url)
275 else:
276 break
278 if len(to_rev_url_list) > 0:
279 new_revised_urls = parallel_get_all_website_links(to_rev_url_list)
281 reviewed_urls.update(new_revised_urls)
283 for new_url in new_revised_urls:
284 get_all_website_links_recursively(
285 new_url, reviewed_urls, limit, crawl_depth=crawl_depth, current_depth=current_depth + 1, filters=filters
286 )
289def get_all_websites(
290 urls, limit=1, html=False, crawl_depth: int = 1, filters: List[str] = None, headers: dict = None
291) -> pd.DataFrame:
292 """
293 Crawl a list of websites and return a DataFrame containing the results.
295 Args:
296 urls (list): a list of URLs to crawl
297 limit (int): Absolute max number of web pages to crawl, regardless of crawl depth.
298 crawl_depth (int): Crawl depth for URLs.
299 html (bool): a boolean indicating whether to include the HTML content in the results
300 filters (List[str]): Crawl URLs that only match these regex patterns.
301 headers (dict): headers of request
303 Returns:
304 A DataFrame containing the results.
305 """
306 reviewed_urls = {}
308 def fetch_url(url, crawl_depth: int = 1, filters: List[str] = None):
309 # Allow URLs to be passed wrapped in quotation marks so they can be used
310 # directly from the SQL editor.
311 if url.startswith("'") and url.endswith("'"): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 url = url[1:-1]
313 url = url.rstrip("/")
314 if urlparse(url).scheme == "": 314 ↛ 316line 314 didn't jump to line 316 because the condition on line 314 was never true
315 # Try HTTPS first
316 url = "https://" + url
317 get_all_website_links_recursively(
318 url, reviewed_urls, limit, crawl_depth=crawl_depth, filters=filters, headers=headers
319 )
321 # Use a ThreadPoolExecutor to run the helper function in parallel.
322 with concurrent.futures.ThreadPoolExecutor() as executor:
323 future_to_url = {executor.submit(fetch_url, url, crawl_depth=crawl_depth, filters=filters): url for url in urls}
325 for future in concurrent.futures.as_completed(future_to_url):
326 future.result()
328 columns_to_ignore = ["urls"]
329 if html is False: 329 ↛ 331line 329 didn't jump to line 331 because the condition on line 329 was always true
330 columns_to_ignore += ["html_content"]
331 df = dict_to_dataframe(reviewed_urls, columns_to_ignore=columns_to_ignore, index_name="url")
333 if not df.empty and df[df.error.isna()].empty: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 raise Exception(str(df.iloc[0].error))
335 return df
338def dict_to_dataframe(dict_of_dicts, columns_to_ignore=None, index_name=None) -> pd.DataFrame:
339 """
340 Convert a dictionary of dictionaries to a DataFrame.
342 Args:
343 dict_of_dicts (dict): a dictionary of dictionaries
344 columns_to_ignore (list): a list of columns to ignore
345 index_name (str): the name of the index column
346 Returns:
347 A DataFrame containing the data.
348 """
349 df = pd.DataFrame.from_dict(dict_of_dicts, orient="index")
351 if columns_to_ignore: 351 ↛ 356line 351 didn't jump to line 356 because the condition on line 351 was always true
352 for column in columns_to_ignore:
353 if column in df.columns: 353 ↛ 352line 353 didn't jump to line 352 because the condition on line 353 was always true
354 df = df.drop(column, axis=1)
356 if index_name: 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true
357 df.index.name = index_name
359 return df