google_storage.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import base64
  2. import io
  3. from collections.abc import Generator
  4. from contextlib import closing
  5. from flask import Flask
  6. from google.cloud import storage as GoogleCloudStorage
  7. from extensions.storage.base_storage import BaseStorage
  8. class GoogleStorage(BaseStorage):
  9. """Implementation for google storage.
  10. """
  11. def __init__(self, app: Flask):
  12. super().__init__(app)
  13. app_config = self.app.config
  14. self.bucket_name = app_config.get('GOOGLE_STORAGE_BUCKET_NAME')
  15. service_account_json_str = app_config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')
  16. # if service_account_json_str is empty, use Application Default Credentials
  17. if service_account_json_str:
  18. service_account_json = base64.b64decode(service_account_json_str).decode('utf-8')
  19. self.client = GoogleCloudStorage.Client.from_service_account_info(service_account_json)
  20. else:
  21. self.client = GoogleCloudStorage.Client()
  22. def save(self, filename, data):
  23. bucket = self.client.get_bucket(self.bucket_name)
  24. blob = bucket.blob(filename)
  25. with io.BytesIO(data) as stream:
  26. blob.upload_from_file(stream)
  27. def load_once(self, filename: str) -> bytes:
  28. bucket = self.client.get_bucket(self.bucket_name)
  29. blob = bucket.get_blob(filename)
  30. data = blob.download_as_bytes()
  31. return data
  32. def load_stream(self, filename: str) -> Generator:
  33. def generate(filename: str = filename) -> Generator:
  34. bucket = self.client.get_bucket(self.bucket_name)
  35. blob = bucket.get_blob(filename)
  36. with closing(blob.open(mode='rb')) as blob_stream:
  37. while chunk := blob_stream.read(4096):
  38. yield chunk
  39. return generate()
  40. def download(self, filename, target_filepath):
  41. bucket = self.client.get_bucket(self.bucket_name)
  42. blob = bucket.get_blob(filename)
  43. with open(target_filepath, "wb") as my_blob:
  44. blob_data = blob.download_blob()
  45. blob_data.readinto(my_blob)
  46. def exists(self, filename):
  47. bucket = self.client.get_bucket(self.bucket_name)
  48. blob = bucket.blob(filename)
  49. return blob.exists()
  50. def delete(self, filename):
  51. bucket = self.client.get_bucket(self.bucket_name)
  52. bucket.delete_blob(filename)