blod.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """Schema for Blobs and Blob Loaders.
  2. The goal is to facilitate decoupling of content loading from content parsing code.
  3. In addition, content loading code should provide a lazy loading interface by default.
  4. """
  5. from __future__ import annotations
  6. import contextlib
  7. import mimetypes
  8. from abc import ABC, abstractmethod
  9. from collections.abc import Generator, Iterable, Mapping
  10. from io import BufferedReader, BytesIO
  11. from pathlib import PurePath
  12. from typing import Any, Optional, Union
  13. from pydantic import BaseModel, ConfigDict, model_validator
  14. PathLike = Union[str, PurePath]
  15. class Blob(BaseModel):
  16. """A blob is used to represent raw data by either reference or value.
  17. Provides an interface to materialize the blob in different representations, and
  18. help to decouple the development of data loaders from the downstream parsing of
  19. the raw data.
  20. Inspired by: https://developer.mozilla.org/en-US/docs/Web/API/Blob
  21. """
  22. data: Union[bytes, str, None] = None # Raw data
  23. mimetype: Optional[str] = None # Not to be confused with a file extension
  24. encoding: str = "utf-8" # Use utf-8 as default encoding, if decoding to string
  25. # Location where the original content was found
  26. # Represent location on the local file system
  27. # Useful for situations where downstream code assumes it must work with file paths
  28. # rather than in-memory content.
  29. path: Optional[PathLike] = None
  30. model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
  31. @property
  32. def source(self) -> Optional[str]:
  33. """The source location of the blob as string if known otherwise none."""
  34. return str(self.path) if self.path else None
  35. @model_validator(mode="before")
  36. @classmethod
  37. def check_blob_is_valid(cls, values: Mapping[str, Any]) -> Mapping[str, Any]:
  38. """Verify that either data or path is provided."""
  39. if "data" not in values and "path" not in values:
  40. raise ValueError("Either data or path must be provided")
  41. return values
  42. def as_string(self) -> str:
  43. """Read data as a string."""
  44. if self.data is None and self.path:
  45. with open(str(self.path), encoding=self.encoding) as f:
  46. return f.read()
  47. elif isinstance(self.data, bytes):
  48. return self.data.decode(self.encoding)
  49. elif isinstance(self.data, str):
  50. return self.data
  51. else:
  52. raise ValueError(f"Unable to get string for blob {self}")
  53. def as_bytes(self) -> bytes:
  54. """Read data as bytes."""
  55. if isinstance(self.data, bytes):
  56. return self.data
  57. elif isinstance(self.data, str):
  58. return self.data.encode(self.encoding)
  59. elif self.data is None and self.path:
  60. with open(str(self.path), "rb") as f:
  61. return f.read()
  62. else:
  63. raise ValueError(f"Unable to get bytes for blob {self}")
  64. @contextlib.contextmanager
  65. def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
  66. """Read data as a byte stream."""
  67. if isinstance(self.data, bytes):
  68. yield BytesIO(self.data)
  69. elif self.data is None and self.path:
  70. with open(str(self.path), "rb") as f:
  71. yield f
  72. else:
  73. raise NotImplementedError(f"Unable to convert blob {self}")
  74. @classmethod
  75. def from_path(
  76. cls,
  77. path: PathLike,
  78. *,
  79. encoding: str = "utf-8",
  80. mime_type: Optional[str] = None,
  81. guess_type: bool = True,
  82. ) -> Blob:
  83. """Load the blob from a path like object.
  84. Args:
  85. path: path like object to file to be read
  86. encoding: Encoding to use if decoding the bytes into a string
  87. mime_type: if provided, will be set as the mime-type of the data
  88. guess_type: If True, the mimetype will be guessed from the file extension,
  89. if a mime-type was not provided
  90. Returns:
  91. Blob instance
  92. """
  93. if mime_type is None and guess_type:
  94. _mimetype = mimetypes.guess_type(path)[0] if guess_type else None
  95. else:
  96. _mimetype = mime_type
  97. # We do not load the data immediately, instead we treat the blob as a
  98. # reference to the underlying data.
  99. return cls(data=None, mimetype=_mimetype, encoding=encoding, path=path)
  100. @classmethod
  101. def from_data(
  102. cls,
  103. data: Union[str, bytes],
  104. *,
  105. encoding: str = "utf-8",
  106. mime_type: Optional[str] = None,
  107. path: Optional[str] = None,
  108. ) -> Blob:
  109. """Initialize the blob from in-memory data.
  110. Args:
  111. data: the in-memory data associated with the blob
  112. encoding: Encoding to use if decoding the bytes into a string
  113. mime_type: if provided, will be set as the mime-type of the data
  114. path: if provided, will be set as the source from which the data came
  115. Returns:
  116. Blob instance
  117. """
  118. return cls(data=data, mimetype=mime_type, encoding=encoding, path=path)
  119. def __repr__(self) -> str:
  120. """Define the blob representation."""
  121. str_repr = f"Blob {id(self)}"
  122. if self.source:
  123. str_repr += f" {self.source}"
  124. return str_repr
  125. class BlobLoader(ABC):
  126. """Abstract interface for blob loaders implementation.
  127. Implementer should be able to load raw content from a datasource system according
  128. to some criteria and return the raw content lazily as a stream of blobs.
  129. """
  130. @abstractmethod
  131. def yield_blobs(
  132. self,
  133. ) -> Iterable[Blob]:
  134. """A lazy loader for raw data represented by Blob object.
  135. Returns:
  136. A generator over blobs
  137. """