Coverage for mindsdb / integrations / handlers / email_handler / email_ingestor.py: 0%

53 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import re 

2 

3from bs4 import BeautifulSoup 

4import bs4.element 

5import chardet 

6 

7import pandas as pd 

8 

9from mindsdb.integrations.handlers.email_handler.email_client import EmailClient 

10from mindsdb.integrations.handlers.email_handler.settings import EmailSearchOptions 

11 

12 

13class EmailIngestor: 

14 """ 

15 Parses emails into a DataFrame. 

16 Does some preprocessing on the raw HTML to extract meaningful text. 

17 """ 

18 

19 def __init__(self, email_client: EmailClient, search_options: EmailSearchOptions): 

20 self.email_client = email_client 

21 self.search_options = search_options 

22 

23 def _is_tag_visible(self, element): 

24 if element.parent.name in ["style", "script", "head", "title", "meta", "[document]"]: 

25 return False 

26 if isinstance(element, bs4.element.Comment): 

27 return False 

28 return True 

29 

30 def _preprocess_raw_html(self, html: str) -> str: 

31 soup = BeautifulSoup(html, "html.parser") 

32 texts = soup.find_all(text=True) 

33 visible_texts = filter(self._is_tag_visible, texts) 

34 return "\n".join(t.strip() for t in visible_texts) 

35 

36 def _ingest_email_row(self, row: pd.Series) -> dict: 

37 if row["body_content_type"] == "html": 

38 # Extract meaningful text from raw HTML. 

39 row["body"] = self._preprocess_raw_html(row["body"]) 

40 body_str = row["body"] 

41 encoding = None 

42 if isinstance(body_str, bytes): 

43 encoding = chardet.detect(body_str)["encoding"] 

44 if encoding is None: 

45 # If chardet can't detect the encoding, we default to utf-8. 

46 encoding = "utf-8" 

47 elif "windows" in encoding.lower(): 

48 # Easier to treat this at utf-8 since str constructor doesn't support all encodings here: 

49 # https://chardet.readthedocs.io/en/latest/supported-encodings.html. 

50 encoding = "utf-8" 

51 try: 

52 body_str = str(body_str, encoding=encoding) 

53 except UnicodeDecodeError: 

54 # If illegal characters are found, we ignore them. 

55 # I encountered this issue with some emails that had a mix of encodings. 

56 body_str = row["body"].decode(encoding, errors="ignore") 

57 # We split by paragraph so make sure there aren't too many newlines in a row. 

58 body_str = re.sub(r"[\r\n]\s*[\r\n]", "\n\n", body_str) 

59 email_data = { 

60 "id": row["id"], 

61 "body": body_str, 

62 "subject": row["subject"], 

63 "to_field": row["to_field"], 

64 "from_field": row["from_field"], 

65 "datetime": row["date"], 

66 } 

67 # Replacing None values {None: ""} 

68 for key in email_data: 

69 if email_data[key] is None: 

70 email_data[key] = "" 

71 

72 return email_data 

73 

74 def ingest(self) -> pd.DataFrame: 

75 emails_df = self.email_client.search_email(self.search_options) 

76 all_email_data = [] 

77 for _, row in emails_df.iterrows(): 

78 all_email_data.append(self._ingest_email_row(row)) 

79 

80 df = pd.DataFrame(all_email_data) 

81 

82 # Replace "(UTC)" with empty string over a pandas DataFrame column 

83 if "datetime" in df.columns: 

84 df["datetime"] = df["datetime"].str.replace(" (UTC)", "") 

85 

86 # Convert datetime string to datetime object, and normalize timezone to UTC. 

87 df["datetime"] = pd.to_datetime( 

88 df["datetime"], utc=True, format="%a, %d %b %Y %H:%M:%S %z", errors="coerce" 

89 ) 

90 

91 return df