Coverage for mindsdb / interfaces / knowledge_base / preprocessing / text_splitter.py: 92%
34 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import List
4class TextSplitter:
5 def __init__(
6 self,
7 chunk_size: int = 1000,
8 chunk_overlap: int = 200,
9 separators: List[str] = None,
10 k_range: float = 0.5,
11 k_ratio: float = 1,
12 ):
13 """
14 Split text into chunks. The logic:
15 - Get a piece of text with chunk_size and try to find the separator at the end of the piece.
16 - The allowed range to find the separator is defined by k_range and k_ratio using formula:
17 k_range * chunk_size / (num * k_ratio + 1)
18 num - is number of a separator from the list
19 - if the separator is not in the rage: switch to the next separator
20 - if the found separator is in the middle of the sentence, use overlapping:
21 - the found text is the current chunk
22 - repeat the search with less strict k_range and k_ratio
23 - the found text will be the beginning of the next chunk
25 :param chunk_size: size of the chunk, which must not be exceeded
26 :param separators: list of separators in order of priority
27 :param k_range: defines the range to look for the separator
28 :param k_ratio: defines how much to shrink the range for the next separator
29 """
30 if separators is None:
31 separators = ["\n\n", "\n", ". ", " ", ""]
32 self.chunk_size = chunk_size
33 self.chunk_overlap = chunk_overlap
34 self.separators = separators
35 self.k_range = k_range
36 self.k_ratio = k_ratio
38 def split_text(self, text: str) -> List[str]:
39 chunks = []
41 while True:
42 if len(text) < self.chunk_size:
43 chunks.append(text)
44 break
46 sep, chunk, shift = self.get_next_chunk(text, self.k_range, self.k_ratio)
47 chunks.append(chunk)
49 text = text[shift:]
50 return chunks
52 def get_next_chunk(self, text: str, k_range: float, k_ratio: float):
53 # returns chunk with separator and shift for the next search iteration
55 chunk = text[: self.chunk_size]
56 # positions = []
57 for i, sep in enumerate(self.separators): 57 ↛ 73line 57 didn't jump to line 73 because the loop on line 57 didn't complete
58 pos = chunk.rfind(sep)
60 vpos = self.chunk_size - pos
61 if vpos < k_range * self.chunk_size / (i * k_ratio + 1):
62 shift = len(sep) + pos
63 if sep.strip(" ") == "":
64 # overlapping
65 sep2, _, shift2 = self.get_next_chunk(text, k_range * 1.5, 0)
66 if sep2.strip(" ") != "":
67 # use shift of previous separator
68 if shift - shift2 < self.chunk_overlap: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 shift = shift2
71 return sep, chunk[:pos], shift
73 raise RuntimeError("Cannot split text")