ext_storage.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from contextlib import closing
  5. from datetime import datetime, timedelta, timezone
  6. from typing import Union
  7. import boto3
  8. from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
  9. from botocore.client import Config
  10. from botocore.exceptions import ClientError
  11. from flask import Flask
  12. class Storage:
  13. def __init__(self):
  14. self.storage_type = None
  15. self.bucket_name = None
  16. self.client = None
  17. self.folder = None
  18. def init_app(self, app: Flask):
  19. self.storage_type = app.config.get('STORAGE_TYPE')
  20. if self.storage_type == 's3':
  21. self.bucket_name = app.config.get('S3_BUCKET_NAME')
  22. self.client = boto3.client(
  23. 's3',
  24. aws_secret_access_key=app.config.get('S3_SECRET_KEY'),
  25. aws_access_key_id=app.config.get('S3_ACCESS_KEY'),
  26. endpoint_url=app.config.get('S3_ENDPOINT'),
  27. region_name=app.config.get('S3_REGION'),
  28. config=Config(s3={'addressing_style': app.config.get('S3_ADDRESS_STYLE')})
  29. )
  30. elif self.storage_type == 'azure-blob':
  31. self.bucket_name = app.config.get('AZURE_BLOB_CONTAINER_NAME')
  32. sas_token = generate_account_sas(
  33. account_name=app.config.get('AZURE_BLOB_ACCOUNT_NAME'),
  34. account_key=app.config.get('AZURE_BLOB_ACCOUNT_KEY'),
  35. resource_types=ResourceTypes(service=True, container=True, object=True),
  36. permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True),
  37. expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1)
  38. )
  39. self.client = BlobServiceClient(account_url=app.config.get('AZURE_BLOB_ACCOUNT_URL'),
  40. credential=sas_token)
  41. else:
  42. self.folder = app.config.get('STORAGE_LOCAL_PATH')
  43. if not os.path.isabs(self.folder):
  44. self.folder = os.path.join(app.root_path, self.folder)
  45. def save(self, filename, data):
  46. if self.storage_type == 's3':
  47. self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
  48. elif self.storage_type == 'azure-blob':
  49. blob_container = self.client.get_container_client(container=self.bucket_name)
  50. blob_container.upload_blob(filename, data)
  51. else:
  52. if not self.folder or self.folder.endswith('/'):
  53. filename = self.folder + filename
  54. else:
  55. filename = self.folder + '/' + filename
  56. folder = os.path.dirname(filename)
  57. os.makedirs(folder, exist_ok=True)
  58. with open(os.path.join(os.getcwd(), filename), "wb") as f:
  59. f.write(data)
  60. def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]:
  61. if stream:
  62. return self.load_stream(filename)
  63. else:
  64. return self.load_once(filename)
  65. def load_once(self, filename: str) -> bytes:
  66. if self.storage_type == 's3':
  67. try:
  68. with closing(self.client) as client:
  69. data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read()
  70. except ClientError as ex:
  71. if ex.response['Error']['Code'] == 'NoSuchKey':
  72. raise FileNotFoundError("File not found")
  73. else:
  74. raise
  75. elif self.storage_type == 'azure-blob':
  76. blob = self.client.get_container_client(container=self.bucket_name)
  77. blob = blob.get_blob_client(blob=filename)
  78. data = blob.download_blob().readall()
  79. else:
  80. if not self.folder or self.folder.endswith('/'):
  81. filename = self.folder + filename
  82. else:
  83. filename = self.folder + '/' + filename
  84. if not os.path.exists(filename):
  85. raise FileNotFoundError("File not found")
  86. with open(filename, "rb") as f:
  87. data = f.read()
  88. return data
  89. def load_stream(self, filename: str) -> Generator:
  90. def generate(filename: str = filename) -> Generator:
  91. if self.storage_type == 's3':
  92. try:
  93. with closing(self.client) as client:
  94. response = client.get_object(Bucket=self.bucket_name, Key=filename)
  95. for chunk in response['Body'].iter_chunks():
  96. yield chunk
  97. except ClientError as ex:
  98. if ex.response['Error']['Code'] == 'NoSuchKey':
  99. raise FileNotFoundError("File not found")
  100. else:
  101. raise
  102. elif self.storage_type == 'azure-blob':
  103. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  104. with closing(blob.download_blob()) as blob_stream:
  105. while chunk := blob_stream.readall(4096):
  106. yield chunk
  107. else:
  108. if not self.folder or self.folder.endswith('/'):
  109. filename = self.folder + filename
  110. else:
  111. filename = self.folder + '/' + filename
  112. if not os.path.exists(filename):
  113. raise FileNotFoundError("File not found")
  114. with open(filename, "rb") as f:
  115. while chunk := f.read(4096): # Read in chunks of 4KB
  116. yield chunk
  117. return generate()
  118. def download(self, filename, target_filepath):
  119. if self.storage_type == 's3':
  120. with closing(self.client) as client:
  121. client.download_file(self.bucket_name, filename, target_filepath)
  122. elif self.storage_type == 'azure-blob':
  123. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  124. with open(target_filepath, "wb") as my_blob:
  125. blob_data = blob.download_blob()
  126. blob_data.readinto(my_blob)
  127. else:
  128. if not self.folder or self.folder.endswith('/'):
  129. filename = self.folder + filename
  130. else:
  131. filename = self.folder + '/' + filename
  132. if not os.path.exists(filename):
  133. raise FileNotFoundError("File not found")
  134. shutil.copyfile(filename, target_filepath)
  135. def exists(self, filename):
  136. if self.storage_type == 's3':
  137. with closing(self.client) as client:
  138. try:
  139. client.head_object(Bucket=self.bucket_name, Key=filename)
  140. return True
  141. except:
  142. return False
  143. elif self.storage_type == 'azure-blob':
  144. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  145. return blob.exists()
  146. else:
  147. if not self.folder or self.folder.endswith('/'):
  148. filename = self.folder + filename
  149. else:
  150. filename = self.folder + '/' + filename
  151. return os.path.exists(filename)
  152. def delete(self, filename):
  153. if self.storage_type == 's3':
  154. self.client.delete_object(Bucket=self.bucket_name, Key=filename)
  155. elif self.storage_type == 'azure-blob':
  156. blob_container = self.client.get_container_client(container=self.bucket_name)
  157. blob_container.delete_blob(filename)
  158. else:
  159. if not self.folder or self.folder.endswith('/'):
  160. filename = self.folder + filename
  161. else:
  162. filename = self.folder + '/' + filename
  163. if os.path.exists(filename):
  164. os.remove(filename)
  165. storage = Storage()
  166. def init_app(app: Flask):
  167. storage.init_app(app)