Coverage for mindsdb / integrations / handlers / email_handler / email_ingestor.py: 0%
53 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import re
3from bs4 import BeautifulSoup
4import bs4.element
5import chardet
7import pandas as pd
9from mindsdb.integrations.handlers.email_handler.email_client import EmailClient
10from mindsdb.integrations.handlers.email_handler.settings import EmailSearchOptions
13class EmailIngestor:
14 """
15 Parses emails into a DataFrame.
16 Does some preprocessing on the raw HTML to extract meaningful text.
17 """
19 def __init__(self, email_client: EmailClient, search_options: EmailSearchOptions):
20 self.email_client = email_client
21 self.search_options = search_options
23 def _is_tag_visible(self, element):
24 if element.parent.name in ["style", "script", "head", "title", "meta", "[document]"]:
25 return False
26 if isinstance(element, bs4.element.Comment):
27 return False
28 return True
30 def _preprocess_raw_html(self, html: str) -> str:
31 soup = BeautifulSoup(html, "html.parser")
32 texts = soup.find_all(text=True)
33 visible_texts = filter(self._is_tag_visible, texts)
34 return "\n".join(t.strip() for t in visible_texts)
36 def _ingest_email_row(self, row: pd.Series) -> dict:
37 if row["body_content_type"] == "html":
38 # Extract meaningful text from raw HTML.
39 row["body"] = self._preprocess_raw_html(row["body"])
40 body_str = row["body"]
41 encoding = None
42 if isinstance(body_str, bytes):
43 encoding = chardet.detect(body_str)["encoding"]
44 if encoding is None:
45 # If chardet can't detect the encoding, we default to utf-8.
46 encoding = "utf-8"
47 elif "windows" in encoding.lower():
48 # Easier to treat this at utf-8 since str constructor doesn't support all encodings here:
49 # https://chardet.readthedocs.io/en/latest/supported-encodings.html.
50 encoding = "utf-8"
51 try:
52 body_str = str(body_str, encoding=encoding)
53 except UnicodeDecodeError:
54 # If illegal characters are found, we ignore them.
55 # I encountered this issue with some emails that had a mix of encodings.
56 body_str = row["body"].decode(encoding, errors="ignore")
57 # We split by paragraph so make sure there aren't too many newlines in a row.
58 body_str = re.sub(r"[\r\n]\s*[\r\n]", "\n\n", body_str)
59 email_data = {
60 "id": row["id"],
61 "body": body_str,
62 "subject": row["subject"],
63 "to_field": row["to_field"],
64 "from_field": row["from_field"],
65 "datetime": row["date"],
66 }
67 # Replacing None values {None: ""}
68 for key in email_data:
69 if email_data[key] is None:
70 email_data[key] = ""
72 return email_data
74 def ingest(self) -> pd.DataFrame:
75 emails_df = self.email_client.search_email(self.search_options)
76 all_email_data = []
77 for _, row in emails_df.iterrows():
78 all_email_data.append(self._ingest_email_row(row))
80 df = pd.DataFrame(all_email_data)
82 # Replace "(UTC)" with empty string over a pandas DataFrame column
83 if "datetime" in df.columns:
84 df["datetime"] = df["datetime"].str.replace(" (UTC)", "")
86 # Convert datetime string to datetime object, and normalize timezone to UTC.
87 df["datetime"] = pd.to_datetime(
88 df["datetime"], utc=True, format="%a, %d %b %Y %H:%M:%S %z", errors="coerce"
89 )
91 return df