Coverage for mindsdb / integrations / handlers / web_handler / urlcrawl_helpers.py: 79%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import concurrent.futures 

2import io 

3import re 

4import traceback 

5from threading import Lock 

6from typing import List 

7from urllib.parse import urljoin, urlparse, urlunparse 

8 

9import html2text 

10import fitz # PyMuPDF 

11import pandas as pd 

12import requests 

13from bs4 import BeautifulSoup 

14from mindsdb.utilities import log 

15 

16logger = log.getLogger(__name__) 

17 

18 

19def pdf_to_markdown(response, gap_threshold=10): 

20 """ 

21 Convert a PDF document to Markdown text. 

22 

23 Args: 

24 response: the response object containing the PDF data 

25 gap_threshold (int): the vertical gap size that triggers a new line in the output (default 10) 

26 

27 Returns: 

28 A string containing the converted Markdown text. 

29 

30 Raises: 

31 Exception -- if the PDF data cannot be processed. 

32 """ 

33 

34 try: 

35 file_stream = io.BytesIO(response.content) 

36 document = fitz.open(stream=file_stream, filetype="pdf") 

37 except Exception as e: 

38 raise Exception("Failed to process PDF data: " + str(e)) 

39 

40 markdown_lines = [] 

41 for page_num in range(len(document)): 

42 page = document.load_page(page_num) 

43 

44 blocks = page.get_text("blocks") 

45 

46 blocks.sort(key=lambda block: (block[1], block[0])) 

47 

48 previous_block_bottom = 0 

49 for block in blocks: 

50 y0 = block[1] 

51 y1 = block[3] 

52 block_text = block[4] 

53 

54 # Check if there's a large vertical gap between this block and the previous one 

55 if y0 - previous_block_bottom > gap_threshold: 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was always true

56 markdown_lines.append("") 

57 

58 markdown_lines.append(block_text) 

59 previous_block_bottom = y1 

60 

61 markdown_lines.append("") 

62 

63 document.close() 

64 

65 return "\n".join(markdown_lines) 

66 

67 

68def is_valid(url) -> bool: 

69 """ 

70 Check if a URL is valid. 

71 

72 Args: 

73 url: the URL to check 

74 

75 Returns: 

76 bool: True if the URL is valid, False otherwise. 

77 """ 

78 parsed = urlparse(url) 

79 return bool(parsed.netloc) and bool(parsed.scheme) 

80 

81 

82def parallel_get_all_website_links(urls) -> dict: 

83 """ 

84 Fetch all website links from a list of URLs. 

85 

86 Args: 

87 urls (list): a list of URLs to fetch links from 

88 

89 Returns: 

90 A dictionary mapping each URL to a list of links found on that URL. 

91 

92 Raises: 

93 Exception: if an error occurs while fetching links from a URL. 

94 """ 

95 url_contents = {} 

96 

97 if len(urls) <= 10: 97 ↛ 102line 97 didn't jump to line 102 because the condition on line 97 was always true

98 for url in urls: 

99 url_contents[url] = get_all_website_links(url) 

100 return url_contents 

101 

102 with concurrent.futures.ProcessPoolExecutor() as executor: 

103 future_to_url = {executor.submit(get_all_website_links, url): url for url in urls} 

104 for future in concurrent.futures.as_completed(future_to_url): 

105 url = future_to_url[future] 

106 try: 

107 url_contents[url] = future.result() 

108 except Exception as exc: 

109 logger.error(f"{url} generated an exception: {exc}") 

110 # don't raise the exception, just log it, continue processing other urls 

111 

112 return url_contents 

113 

114 

115def get_all_website_links(url, headers: dict = None) -> dict: 

116 """ 

117 Fetch all website links from a URL. 

118 

119 Args: 

120 url (str): the URL to fetch links from 

121 headers (dict): a dictionary of headers to use when fetching links 

122 

123 Returns: 

124 A dictionary containing the URL, the extracted links, the HTML content, the text content, and any error that occurred. 

125 """ 

126 logger.info("rawling: {url} ...".format(url=url)) 

127 urls = set() 

128 

129 domain_name = urlparse(url).netloc 

130 try: 

131 session = requests.Session() 

132 

133 # Add headers to mimic a real browser request 

134 if headers is None: 

135 headers = {} 

136 if "User-Agent" not in headers: 

137 headers["User-Agent"] = ( 

138 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.3" 

139 ) 

140 

141 response = session.get(url, headers=headers) 

142 if "cookie" in response.request.headers: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 session.cookies.update(response.cookies) 

144 

145 content_type = response.headers.get("Content-Type", "").lower() 

146 

147 if "application/pdf" in content_type: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 content_html = "PDF" 

149 content_text = pdf_to_markdown(response) 

150 else: 

151 content_html = response.text 

152 

153 # Parse HTML content with BeautifulSoup 

154 soup = BeautifulSoup(content_html, "html.parser") 

155 content_text = get_readable_text_from_soup(soup) 

156 for a_tag in soup.find_all("a"): 

157 href = a_tag.attrs.get("href") 

158 if href == "" or href is None: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 continue 

160 href = urljoin(url, href) 

161 parsed_href = urlparse(href) 

162 href = urlunparse((parsed_href.scheme, parsed_href.netloc, parsed_href.path, "", "", "")) 

163 if not is_valid(href): 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 continue 

165 if href in urls: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 continue 

167 if domain_name != parsed_href.netloc: 

168 continue 

169 

170 href = href.rstrip("/") 

171 urls.add(href) 

172 

173 except Exception: 

174 error_message = traceback.format_exc().splitlines()[-1] 

175 logger.exception("An exception occurred:") 

176 return { 

177 "url": url, 

178 "urls": urls, 

179 "html_content": "", 

180 "text_content": "", 

181 "error": str(error_message), 

182 } 

183 

184 return { 

185 "url": url, 

186 "urls": urls, 

187 "html_content": content_html, 

188 "text_content": content_text, 

189 "error": None, 

190 } 

191 

192 

193def get_readable_text_from_soup(soup) -> str: 

194 """ 

195 Extract readable text from a BeautifulSoup object and convert it to Markdown. 

196 

197 Args: 

198 soup (BeautifulSoup): a BeautifulSoup object 

199 

200 Returns: 

201 The extracted text in Markdown format. 

202 """ 

203 html_converter = html2text.HTML2Text() 

204 html_converter.ignore_links = False 

205 return html_converter.handle(str(soup)) 

206 

207 

208def get_all_website_links_recursively( 

209 url, 

210 reviewed_urls, 

211 limit=None, 

212 crawl_depth: int = 1, 

213 current_depth: int = 0, 

214 filters: List[str] = None, 

215 headers=None, 

216): 

217 """ 

218 Recursively gathers all links from a given website up to a specified limit. 

219 

220 Args: 

221 url (str): The starting URL to fetch links from. 

222 reviewed_urls (dict): A dictionary to keep track of reviewed URLs and associated data. 

223 limit (int, optional): The maximum number of URLs to process. 

224 crawl_depth: How deep to crawl from each base URL. 0 = scrape given URLs only 

225 current_depth: How deep we are currently crawling from the base URL. 

226 filters (List[str]): Crawl URLs that only match these regex patterns. 

227 

228 TODO: Refactor this function to use a iterative aproach instead of recursion 

229 """ 

230 if limit is not None: 

231 if len(reviewed_urls) >= limit: 

232 return reviewed_urls 

233 

234 if not filters: 234 ↛ 237line 234 didn't jump to line 237 because the condition on line 234 was always true

235 matches_filter = True 

236 else: 

237 matches_filter = any(re.match(f, url) is not None for f in filters) 

238 if url not in reviewed_urls and matches_filter: 

239 try: 

240 reviewed_urls[url] = get_all_website_links(url, headers=headers) 

241 except Exception: 

242 error_message = traceback.format_exc().splitlines()[-1] 

243 logger.exception("An exception occurred:") 

244 reviewed_urls[url] = { 

245 "url": url, 

246 "urls": [], 

247 "html_content": "", 

248 "text_content": "", 

249 "error": str(error_message), 

250 } 

251 

252 if crawl_depth is not None and crawl_depth == current_depth: 

253 return reviewed_urls 

254 

255 to_rev_url_list = [] 

256 

257 # create a list of new urls to review that don't exist in the already reviewed ones 

258 for new_url in reviewed_urls[url]["urls"]: 

259 if not filters: 259 ↛ 262line 259 didn't jump to line 262 because the condition on line 259 was always true

260 matches_filter = True 

261 else: 

262 matches_filter = any(re.match(f, new_url) is not None for f in filters) 

263 if not matches_filter: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 continue 

265 # if this is already in the urls, then no need to go and crawl for it 

266 if new_url in reviewed_urls or new_url in to_rev_url_list: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 continue 

268 

269 # insert immediately to count limit between threads. fill later 

270 url_list_lock = Lock() 

271 with url_list_lock: 

272 if limit is None or len(reviewed_urls) < limit: 

273 reviewed_urls[new_url] = {} 

274 to_rev_url_list.append(new_url) 

275 else: 

276 break 

277 

278 if len(to_rev_url_list) > 0: 

279 new_revised_urls = parallel_get_all_website_links(to_rev_url_list) 

280 

281 reviewed_urls.update(new_revised_urls) 

282 

283 for new_url in new_revised_urls: 

284 get_all_website_links_recursively( 

285 new_url, reviewed_urls, limit, crawl_depth=crawl_depth, current_depth=current_depth + 1, filters=filters 

286 ) 

287 

288 

289def get_all_websites( 

290 urls, limit=1, html=False, crawl_depth: int = 1, filters: List[str] = None, headers: dict = None 

291) -> pd.DataFrame: 

292 """ 

293 Crawl a list of websites and return a DataFrame containing the results. 

294 

295 Args: 

296 urls (list): a list of URLs to crawl 

297 limit (int): Absolute max number of web pages to crawl, regardless of crawl depth. 

298 crawl_depth (int): Crawl depth for URLs. 

299 html (bool): a boolean indicating whether to include the HTML content in the results 

300 filters (List[str]): Crawl URLs that only match these regex patterns. 

301 headers (dict): headers of request 

302 

303 Returns: 

304 A DataFrame containing the results. 

305 """ 

306 reviewed_urls = {} 

307 

308 def fetch_url(url, crawl_depth: int = 1, filters: List[str] = None): 

309 # Allow URLs to be passed wrapped in quotation marks so they can be used 

310 # directly from the SQL editor. 

311 if url.startswith("'") and url.endswith("'"): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 url = url[1:-1] 

313 url = url.rstrip("/") 

314 if urlparse(url).scheme == "": 314 ↛ 316line 314 didn't jump to line 316 because the condition on line 314 was never true

315 # Try HTTPS first 

316 url = "https://" + url 

317 get_all_website_links_recursively( 

318 url, reviewed_urls, limit, crawl_depth=crawl_depth, filters=filters, headers=headers 

319 ) 

320 

321 # Use a ThreadPoolExecutor to run the helper function in parallel. 

322 with concurrent.futures.ThreadPoolExecutor() as executor: 

323 future_to_url = {executor.submit(fetch_url, url, crawl_depth=crawl_depth, filters=filters): url for url in urls} 

324 

325 for future in concurrent.futures.as_completed(future_to_url): 

326 future.result() 

327 

328 columns_to_ignore = ["urls"] 

329 if html is False: 329 ↛ 331line 329 didn't jump to line 331 because the condition on line 329 was always true

330 columns_to_ignore += ["html_content"] 

331 df = dict_to_dataframe(reviewed_urls, columns_to_ignore=columns_to_ignore, index_name="url") 

332 

333 if not df.empty and df[df.error.isna()].empty: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise Exception(str(df.iloc[0].error)) 

335 return df 

336 

337 

338def dict_to_dataframe(dict_of_dicts, columns_to_ignore=None, index_name=None) -> pd.DataFrame: 

339 """ 

340 Convert a dictionary of dictionaries to a DataFrame. 

341 

342 Args: 

343 dict_of_dicts (dict): a dictionary of dictionaries 

344 columns_to_ignore (list): a list of columns to ignore 

345 index_name (str): the name of the index column 

346 Returns: 

347 A DataFrame containing the data. 

348 """ 

349 df = pd.DataFrame.from_dict(dict_of_dicts, orient="index") 

350 

351 if columns_to_ignore: 351 ↛ 356line 351 didn't jump to line 356 because the condition on line 351 was always true

352 for column in columns_to_ignore: 

353 if column in df.columns: 353 ↛ 352line 353 didn't jump to line 352 because the condition on line 353 was always true

354 df = df.drop(column, axis=1) 

355 

356 if index_name: 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true

357 df.index.name = index_name 

358 

359 return df