123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import logging
- from mimetypes import guess_extension
- from typing import Optional
- from core.file import File, FileTransferMethod, FileType
- from core.tools.entities.tool_entities import ToolInvokeMessage
- from core.tools.tool_file_manager import ToolFileManager
- logger = logging.getLogger(__name__)
- class ToolFileMessageTransformer:
- @classmethod
- def transform_tool_invoke_messages(
- cls, messages: list[ToolInvokeMessage], user_id: str, tenant_id: str, conversation_id: str | None
- ) -> list[ToolInvokeMessage]:
- """
- Transform tool message and handle file download
- """
- result = []
- for message in messages:
- if message.type in {ToolInvokeMessage.MessageType.TEXT, ToolInvokeMessage.MessageType.LINK}:
- result.append(message)
- elif message.type == ToolInvokeMessage.MessageType.IMAGE and isinstance(message.message, str):
- # try to download image
- try:
- file = ToolFileManager.create_file_by_url(
- user_id=user_id, tenant_id=tenant_id, conversation_id=conversation_id, file_url=message.message
- )
- url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".png"}'
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.IMAGE_LINK,
- message=url,
- save_as=message.save_as,
- meta=message.meta.copy() if message.meta is not None else {},
- )
- )
- except Exception as e:
- logger.exception(e)
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.TEXT,
- message=f"Failed to download image: {message.message}, please try to download it manually.",
- meta=message.meta.copy() if message.meta is not None else {},
- save_as=message.save_as,
- )
- )
- elif message.type == ToolInvokeMessage.MessageType.BLOB:
- # get mime type and save blob to storage
- assert message.meta is not None
- mimetype = message.meta.get("mime_type", "octet/stream")
- # if message is str, encode it to bytes
- if isinstance(message.message, str):
- message.message = message.message.encode("utf-8")
- # FIXME: should do a type check here.
- assert isinstance(message.message, bytes)
- file = ToolFileManager.create_file_by_raw(
- user_id=user_id,
- tenant_id=tenant_id,
- conversation_id=conversation_id,
- file_binary=message.message,
- mimetype=mimetype,
- )
- url = cls.get_tool_file_url(tool_file_id=file.id, extension=guess_extension(file.mimetype))
- # check if file is image
- if "image" in mimetype:
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.IMAGE_LINK,
- message=url,
- save_as=message.save_as,
- meta=message.meta.copy() if message.meta is not None else {},
- )
- )
- else:
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.LINK,
- message=url,
- save_as=message.save_as,
- meta=message.meta.copy() if message.meta is not None else {},
- )
- )
- elif message.type == ToolInvokeMessage.MessageType.FILE:
- assert message.meta is not None
- file = message.meta.get("file")
- if isinstance(file, File):
- if file.transfer_method == FileTransferMethod.TOOL_FILE:
- assert file.related_id is not None
- url = cls.get_tool_file_url(tool_file_id=file.related_id, extension=file.extension)
- if file.type == FileType.IMAGE:
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.IMAGE_LINK,
- message=url,
- save_as=message.save_as,
- meta=message.meta.copy() if message.meta is not None else {},
- )
- )
- else:
- result.append(
- ToolInvokeMessage(
- type=ToolInvokeMessage.MessageType.LINK,
- message=url,
- save_as=message.save_as,
- meta=message.meta.copy() if message.meta is not None else {},
- )
- )
- else:
- result.append(message)
- else:
- result.append(message)
- return result
- @classmethod
- def get_tool_file_url(cls, tool_file_id: str, extension: Optional[str]) -> str:
- return f'/files/tools/{tool_file_id}{extension or ".bin"}'
|