local_storage.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from flask import Flask
  5. from extensions.storage.base_storage import BaseStorage
  6. class LocalStorage(BaseStorage):
  7. """Implementation for local storage."""
  8. def __init__(self, app: Flask):
  9. super().__init__(app)
  10. folder = self.app.config.get("STORAGE_LOCAL_PATH")
  11. if not os.path.isabs(folder):
  12. folder = os.path.join(app.root_path, folder)
  13. self.folder = folder
  14. def save(self, filename, data):
  15. if not self.folder or self.folder.endswith("/"):
  16. filename = self.folder + filename
  17. else:
  18. filename = self.folder + "/" + filename
  19. folder = os.path.dirname(filename)
  20. os.makedirs(folder, exist_ok=True)
  21. with open(os.path.join(os.getcwd(), filename), "wb") as f:
  22. f.write(data)
  23. def load_once(self, filename: str) -> bytes:
  24. if not self.folder or self.folder.endswith("/"):
  25. filename = self.folder + filename
  26. else:
  27. filename = self.folder + "/" + filename
  28. if not os.path.exists(filename):
  29. raise FileNotFoundError("File not found")
  30. with open(filename, "rb") as f:
  31. data = f.read()
  32. return data
  33. def load_stream(self, filename: str) -> Generator:
  34. def generate(filename: str = filename) -> Generator:
  35. if not self.folder or self.folder.endswith("/"):
  36. filename = self.folder + filename
  37. else:
  38. filename = self.folder + "/" + filename
  39. if not os.path.exists(filename):
  40. raise FileNotFoundError("File not found")
  41. with open(filename, "rb") as f:
  42. while chunk := f.read(4096): # Read in chunks of 4KB
  43. yield chunk
  44. return generate()
  45. def download(self, filename, target_filepath):
  46. if not self.folder or self.folder.endswith("/"):
  47. filename = self.folder + filename
  48. else:
  49. filename = self.folder + "/" + filename
  50. if not os.path.exists(filename):
  51. raise FileNotFoundError("File not found")
  52. shutil.copyfile(filename, target_filepath)
  53. def exists(self, filename):
  54. if not self.folder or self.folder.endswith("/"):
  55. filename = self.folder + filename
  56. else:
  57. filename = self.folder + "/" + filename
  58. return os.path.exists(filename)
  59. def delete(self, filename):
  60. if not self.folder or self.folder.endswith("/"):
  61. filename = self.folder + filename
  62. else:
  63. filename = self.folder + "/" + filename
  64. if os.path.exists(filename):
  65. os.remove(filename)