markdown.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import logging
  2. import re
  3. from typing import Optional, List, Tuple, cast
  4. from langchain.document_loaders.base import BaseLoader
  5. from langchain.document_loaders.helpers import detect_file_encodings
  6. from langchain.schema import Document
  7. logger = logging.getLogger(__name__)
  8. class MarkdownLoader(BaseLoader):
  9. """Load md files.
  10. Args:
  11. file_path: Path to the file to load.
  12. remove_hyperlinks: Whether to remove hyperlinks from the text.
  13. remove_images: Whether to remove images from the text.
  14. encoding: File encoding to use. If `None`, the file will be loaded
  15. with the default system encoding.
  16. autodetect_encoding: Whether to try to autodetect the file encoding
  17. if the specified encoding fails.
  18. """
  19. def __init__(
  20. self,
  21. file_path: str,
  22. remove_hyperlinks: bool = True,
  23. remove_images: bool = True,
  24. encoding: Optional[str] = None,
  25. autodetect_encoding: bool = True,
  26. ):
  27. """Initialize with file path."""
  28. self._file_path = file_path
  29. self._remove_hyperlinks = remove_hyperlinks
  30. self._remove_images = remove_images
  31. self._encoding = encoding
  32. self._autodetect_encoding = autodetect_encoding
  33. def load(self) -> List[Document]:
  34. tups = self.parse_tups(self._file_path)
  35. documents = []
  36. for header, value in tups:
  37. value = value.strip()
  38. if header is None:
  39. documents.append(Document(page_content=value))
  40. else:
  41. documents.append(Document(page_content=f"\n\n{header}\n{value}"))
  42. return documents
  43. def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
  44. """Convert a markdown file to a dictionary.
  45. The keys are the headers and the values are the text under each header.
  46. """
  47. markdown_tups: List[Tuple[Optional[str], str]] = []
  48. lines = markdown_text.split("\n")
  49. current_header = None
  50. current_text = ""
  51. for line in lines:
  52. header_match = re.match(r"^#+\s", line)
  53. if header_match:
  54. if current_header is not None:
  55. markdown_tups.append((current_header, current_text))
  56. current_header = line
  57. current_text = ""
  58. else:
  59. current_text += line + "\n"
  60. markdown_tups.append((current_header, current_text))
  61. if current_header is not None:
  62. # pass linting, assert keys are defined
  63. markdown_tups = [
  64. (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))
  65. for key, value in markdown_tups
  66. ]
  67. else:
  68. markdown_tups = [
  69. (key, re.sub("\n", "", value)) for key, value in markdown_tups
  70. ]
  71. return markdown_tups
  72. def remove_images(self, content: str) -> str:
  73. """Get a dictionary of a markdown file from its path."""
  74. pattern = r"!{1}\[\[(.*)\]\]"
  75. content = re.sub(pattern, "", content)
  76. return content
  77. def remove_hyperlinks(self, content: str) -> str:
  78. """Get a dictionary of a markdown file from its path."""
  79. pattern = r"\[(.*?)\]\((.*?)\)"
  80. content = re.sub(pattern, r"\1", content)
  81. return content
  82. def parse_tups(self, filepath: str) -> List[Tuple[Optional[str], str]]:
  83. """Parse file into tuples."""
  84. content = ""
  85. try:
  86. with open(filepath, "r", encoding=self._encoding) as f:
  87. content = f.read()
  88. except UnicodeDecodeError as e:
  89. if self._autodetect_encoding:
  90. detected_encodings = detect_file_encodings(filepath)
  91. for encoding in detected_encodings:
  92. logger.debug("Trying encoding: ", encoding.encoding)
  93. try:
  94. with open(filepath, encoding=encoding.encoding) as f:
  95. content = f.read()
  96. break
  97. except UnicodeDecodeError:
  98. continue
  99. else:
  100. raise RuntimeError(f"Error loading {filepath}") from e
  101. except Exception as e:
  102. raise RuntimeError(f"Error loading {filepath}") from e
  103. if self._remove_hyperlinks:
  104. content = self.remove_hyperlinks(content)
  105. if self._remove_images:
  106. content = self.remove_images(content)
  107. return self.markdown_to_tups(content)