upload_file_parser.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import base64
  2. import hashlib
  3. import hmac
  4. import logging
  5. import os
  6. import time
  7. from typing import Optional
  8. from flask import current_app
  9. from extensions.ext_storage import storage
  10. IMAGE_EXTENSIONS = ['jpg', 'jpeg', 'png', 'webp', 'gif', 'svg']
  11. IMAGE_EXTENSIONS.extend([ext.upper() for ext in IMAGE_EXTENSIONS])
  12. class UploadFileParser:
  13. @classmethod
  14. def get_image_data(cls, upload_file, force_url: bool = False) -> Optional[str]:
  15. if not upload_file:
  16. return None
  17. if upload_file.extension not in IMAGE_EXTENSIONS:
  18. return None
  19. if current_app.config['MULTIMODAL_SEND_IMAGE_FORMAT'] == 'url' or force_url:
  20. return cls.get_signed_temp_image_url(upload_file)
  21. else:
  22. # get image file base64
  23. try:
  24. data = storage.load(upload_file.key)
  25. except FileNotFoundError:
  26. logging.error(f'File not found: {upload_file.key}')
  27. return None
  28. encoded_string = base64.b64encode(data).decode('utf-8')
  29. return f'data:{upload_file.mime_type};base64,{encoded_string}'
  30. @classmethod
  31. def get_signed_temp_image_url(cls, upload_file) -> str:
  32. """
  33. get signed url from upload file
  34. :param upload_file: UploadFile object
  35. :return:
  36. """
  37. base_url = current_app.config.get('FILES_URL')
  38. image_preview_url = f'{base_url}/files/{upload_file.id}/image-preview'
  39. timestamp = str(int(time.time()))
  40. nonce = os.urandom(16).hex()
  41. data_to_sign = f"image-preview|{upload_file.id}|{timestamp}|{nonce}"
  42. secret_key = current_app.config['SECRET_KEY'].encode()
  43. sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
  44. encoded_sign = base64.urlsafe_b64encode(sign).decode()
  45. return f"{image_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
  46. @classmethod
  47. def verify_image_file_signature(cls, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
  48. """
  49. verify signature
  50. :param upload_file_id: file id
  51. :param timestamp: timestamp
  52. :param nonce: nonce
  53. :param sign: signature
  54. :return:
  55. """
  56. data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}"
  57. secret_key = current_app.config['SECRET_KEY'].encode()
  58. recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
  59. recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
  60. # verify signature
  61. if sign != recalculated_encoded_sign:
  62. return False
  63. current_time = int(time.time())
  64. return current_time - int(timestamp) <= 300 # expired after 5 minutes