fixed_text_splitter.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """Functionality for splitting text."""
  2. from __future__ import annotations
  3. from typing import Any, List, Optional
  4. from core.model_runtime.model_providers.__base.tokenizers.gpt2_tokenzier import GPT2Tokenizer
  5. from langchain.text_splitter import (TS, AbstractSet, Collection, Literal, RecursiveCharacterTextSplitter,
  6. TokenTextSplitter, Type, Union)
  7. class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
  8. """
  9. This class is used to implement from_gpt2_encoder, to prevent using of tiktoken
  10. """
  11. @classmethod
  12. def from_gpt2_encoder(
  13. cls: Type[TS],
  14. encoding_name: str = "gpt2",
  15. model_name: Optional[str] = None,
  16. allowed_special: Union[Literal["all"], AbstractSet[str]] = set(),
  17. disallowed_special: Union[Literal["all"], Collection[str]] = "all",
  18. **kwargs: Any,
  19. ):
  20. def _token_encoder(text: str) -> int:
  21. return GPT2Tokenizer.get_num_tokens(text)
  22. if issubclass(cls, TokenTextSplitter):
  23. extra_kwargs = {
  24. "encoding_name": encoding_name,
  25. "model_name": model_name,
  26. "allowed_special": allowed_special,
  27. "disallowed_special": disallowed_special,
  28. }
  29. kwargs = {**kwargs, **extra_kwargs}
  30. return cls(length_function=_token_encoder, **kwargs)
  31. class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter):
  32. def __init__(self, fixed_separator: str = "\n\n", separators: Optional[List[str]] = None, **kwargs: Any):
  33. """Create a new TextSplitter."""
  34. super().__init__(**kwargs)
  35. self._fixed_separator = fixed_separator
  36. self._separators = separators or ["\n\n", "\n", " ", ""]
  37. def split_text(self, text: str) -> List[str]:
  38. """Split incoming text and return chunks."""
  39. if self._fixed_separator:
  40. chunks = text.split(self._fixed_separator)
  41. else:
  42. chunks = list(text)
  43. final_chunks = []
  44. for chunk in chunks:
  45. if self._length_function(chunk) > self._chunk_size:
  46. final_chunks.extend(self.recursive_split_text(chunk))
  47. else:
  48. final_chunks.append(chunk)
  49. return final_chunks
  50. def recursive_split_text(self, text: str) -> List[str]:
  51. """Split incoming text and return chunks."""
  52. final_chunks = []
  53. # Get appropriate separator to use
  54. separator = self._separators[-1]
  55. for _s in self._separators:
  56. if _s == "":
  57. separator = _s
  58. break
  59. if _s in text:
  60. separator = _s
  61. break
  62. # Now that we have the separator, split the text
  63. if separator:
  64. splits = text.split(separator)
  65. else:
  66. splits = list(text)
  67. # Now go merging things, recursively splitting longer texts.
  68. _good_splits = []
  69. for s in splits:
  70. if self._length_function(s) < self._chunk_size:
  71. _good_splits.append(s)
  72. else:
  73. if _good_splits:
  74. merged_text = self._merge_splits(_good_splits, separator)
  75. final_chunks.extend(merged_text)
  76. _good_splits = []
  77. other_info = self.recursive_split_text(s)
  78. final_chunks.extend(other_info)
  79. if _good_splits:
  80. merged_text = self._merge_splits(_good_splits, separator)
  81. final_chunks.extend(merged_text)
  82. return final_chunks