message_file_parser.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from typing import List, Union, Optional, Dict
  2. import requests
  3. from core.file.file_obj import FileObj, FileType, FileTransferMethod
  4. from core.file.upload_file_parser import SUPPORT_EXTENSIONS
  5. from extensions.ext_database import db
  6. from models.account import Account
  7. from models.model import MessageFile, EndUser, AppModelConfig, UploadFile
  8. class MessageFileParser:
  9. def __init__(self, tenant_id: str, app_id: str) -> None:
  10. self.tenant_id = tenant_id
  11. self.app_id = app_id
  12. def validate_and_transform_files_arg(self, files: List[dict], app_model_config: AppModelConfig,
  13. user: Union[Account, EndUser]) -> List[FileObj]:
  14. """
  15. validate and transform files arg
  16. :param files:
  17. :param app_model_config:
  18. :param user:
  19. :return:
  20. """
  21. file_upload_config = app_model_config.file_upload_dict
  22. for file in files:
  23. if not isinstance(file, dict):
  24. raise ValueError('Invalid file format, must be dict')
  25. if not file.get('type'):
  26. raise ValueError('Missing file type')
  27. FileType.value_of(file.get('type'))
  28. if not file.get('transfer_method'):
  29. raise ValueError('Missing file transfer method')
  30. FileTransferMethod.value_of(file.get('transfer_method'))
  31. if file.get('transfer_method') == FileTransferMethod.REMOTE_URL.value:
  32. if not file.get('url'):
  33. raise ValueError('Missing file url')
  34. if not file.get('url').startswith('http'):
  35. raise ValueError('Invalid file url')
  36. if file.get('transfer_method') == FileTransferMethod.LOCAL_FILE.value and not file.get('upload_file_id'):
  37. raise ValueError('Missing file upload_file_id')
  38. # transform files to file objs
  39. type_file_objs = self._to_file_objs(files, file_upload_config)
  40. # validate files
  41. new_files = []
  42. for file_type, file_objs in type_file_objs.items():
  43. if file_type == FileType.IMAGE:
  44. # parse and validate files
  45. image_config = file_upload_config.get('image')
  46. # check if image file feature is enabled
  47. if not image_config['enabled']:
  48. continue
  49. # Validate number of files
  50. if len(files) > image_config['number_limits']:
  51. raise ValueError(f"Number of image files exceeds the maximum limit {image_config['number_limits']}")
  52. for file_obj in file_objs:
  53. # Validate transfer method
  54. if file_obj.transfer_method.value not in image_config['transfer_methods']:
  55. raise ValueError(f'Invalid transfer method: {file_obj.transfer_method.value}')
  56. # Validate file type
  57. if file_obj.type != FileType.IMAGE:
  58. raise ValueError(f'Invalid file type: {file_obj.type}')
  59. if file_obj.transfer_method == FileTransferMethod.REMOTE_URL:
  60. # check remote url valid and is image
  61. result, error = self._check_image_remote_url(file_obj.url)
  62. if result is False:
  63. raise ValueError(error)
  64. elif file_obj.transfer_method == FileTransferMethod.LOCAL_FILE:
  65. # get upload file from upload_file_id
  66. upload_file = (db.session.query(UploadFile)
  67. .filter(
  68. UploadFile.id == file_obj.upload_file_id,
  69. UploadFile.tenant_id == self.tenant_id,
  70. UploadFile.created_by == user.id,
  71. UploadFile.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  72. UploadFile.extension.in_(SUPPORT_EXTENSIONS)
  73. ).first())
  74. # check upload file is belong to tenant and user
  75. if not upload_file:
  76. raise ValueError('Invalid upload file')
  77. new_files.append(file_obj)
  78. # return all file objs
  79. return new_files
  80. def transform_message_files(self, files: List[MessageFile], app_model_config: Optional[AppModelConfig]) -> List[FileObj]:
  81. """
  82. transform message files
  83. :param files:
  84. :param app_model_config:
  85. :return:
  86. """
  87. # transform files to file objs
  88. type_file_objs = self._to_file_objs(files, app_model_config.file_upload_dict)
  89. # return all file objs
  90. return [file_obj for file_objs in type_file_objs.values() for file_obj in file_objs]
  91. def _to_file_objs(self, files: List[Union[Dict, MessageFile]],
  92. file_upload_config: dict) -> Dict[FileType, List[FileObj]]:
  93. """
  94. transform files to file objs
  95. :param files:
  96. :param file_upload_config:
  97. :return:
  98. """
  99. type_file_objs: Dict[FileType, List[FileObj]] = {
  100. # Currently only support image
  101. FileType.IMAGE: []
  102. }
  103. if not files:
  104. return type_file_objs
  105. # group by file type and convert file args or message files to FileObj
  106. for file in files:
  107. file_obj = self._to_file_obj(file, file_upload_config)
  108. if file_obj.type not in type_file_objs:
  109. continue
  110. type_file_objs[file_obj.type].append(file_obj)
  111. return type_file_objs
  112. def _to_file_obj(self, file: Union[dict, MessageFile], file_upload_config: dict) -> FileObj:
  113. """
  114. transform file to file obj
  115. :param file:
  116. :return:
  117. """
  118. if isinstance(file, dict):
  119. transfer_method = FileTransferMethod.value_of(file.get('transfer_method'))
  120. return FileObj(
  121. tenant_id=self.tenant_id,
  122. type=FileType.value_of(file.get('type')),
  123. transfer_method=transfer_method,
  124. url=file.get('url') if transfer_method == FileTransferMethod.REMOTE_URL else None,
  125. upload_file_id=file.get('upload_file_id') if transfer_method == FileTransferMethod.LOCAL_FILE else None,
  126. file_config=file_upload_config
  127. )
  128. else:
  129. return FileObj(
  130. id=file.id,
  131. tenant_id=self.tenant_id,
  132. type=FileType.value_of(file.type),
  133. transfer_method=FileTransferMethod.value_of(file.transfer_method),
  134. url=file.url,
  135. upload_file_id=file.upload_file_id or None,
  136. file_config=file_upload_config
  137. )
  138. def _check_image_remote_url(self, url):
  139. try:
  140. headers = {
  141. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
  142. }
  143. response = requests.head(url, headers=headers, allow_redirects=True)
  144. if response.status_code == 200:
  145. return True, ""
  146. else:
  147. return False, "URL does not exist."
  148. except requests.RequestException as e:
  149. return False, f"Error checking URL: {e}"