file_extractor.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import tempfile
  2. from pathlib import Path
  3. from typing import List, Union, Optional
  4. import requests
  5. from langchain.document_loaders import TextLoader, Docx2txtLoader
  6. from langchain.schema import Document
  7. from core.data_loader.loader.csv import CSVLoader
  8. from core.data_loader.loader.excel import ExcelLoader
  9. from core.data_loader.loader.html import HTMLLoader
  10. from core.data_loader.loader.markdown import MarkdownLoader
  11. from core.data_loader.loader.pdf import PdfLoader
  12. from extensions.ext_storage import storage
  13. from models.model import UploadFile
  14. SUPPORT_URL_CONTENT_TYPES = ['application/pdf', 'text/plain']
  15. USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
  16. class FileExtractor:
  17. @classmethod
  18. def load(cls, upload_file: UploadFile, return_text: bool = False) -> Union[List[Document] | str]:
  19. with tempfile.TemporaryDirectory() as temp_dir:
  20. suffix = Path(upload_file.key).suffix
  21. file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
  22. storage.download(upload_file.key, file_path)
  23. return cls.load_from_file(file_path, return_text, upload_file)
  24. @classmethod
  25. def load_from_url(cls, url: str, return_text: bool = False) -> Union[List[Document] | str]:
  26. response = requests.get(url, headers={
  27. "User-Agent": USER_AGENT
  28. })
  29. with tempfile.TemporaryDirectory() as temp_dir:
  30. suffix = Path(url).suffix
  31. file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
  32. with open(file_path, 'wb') as file:
  33. file.write(response.content)
  34. return cls.load_from_file(file_path, return_text)
  35. @classmethod
  36. def load_from_file(cls, file_path: str, return_text: bool = False,
  37. upload_file: Optional[UploadFile] = None) -> Union[List[Document] | str]:
  38. input_file = Path(file_path)
  39. delimiter = '\n'
  40. file_extension = input_file.suffix.lower()
  41. if file_extension == '.xlsx':
  42. loader = ExcelLoader(file_path)
  43. elif file_extension == '.pdf':
  44. loader = PdfLoader(file_path, upload_file=upload_file)
  45. elif file_extension in ['.md', '.markdown']:
  46. loader = MarkdownLoader(file_path, autodetect_encoding=True)
  47. elif file_extension in ['.htm', '.html']:
  48. loader = HTMLLoader(file_path)
  49. elif file_extension == '.docx':
  50. loader = Docx2txtLoader(file_path)
  51. elif file_extension == '.csv':
  52. loader = CSVLoader(file_path, autodetect_encoding=True)
  53. else:
  54. # txt
  55. loader = TextLoader(file_path, autodetect_encoding=True)
  56. return delimiter.join([document.page_content for document in loader.load()]) if return_text else loader.load()