from typing import Optional

from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.model_runtime.entities.message_entities import PromptMessage
from core.model_runtime.entities.model_entities import ModelPropertyKey
from core.prompt.entities.advanced_prompt_entities import MemoryConfig


class PromptTransform:
    def _append_chat_histories(self, memory: TokenBufferMemory,
                               memory_config: MemoryConfig,
                               prompt_messages: list[PromptMessage],
                               model_config: ModelConfigWithCredentialsEntity) -> list[PromptMessage]:
        rest_tokens = self._calculate_rest_token(prompt_messages, model_config)
        histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens)
        prompt_messages.extend(histories)

        return prompt_messages

    def _calculate_rest_token(self, prompt_messages: list[PromptMessage],
                              model_config: ModelConfigWithCredentialsEntity) -> int:
        rest_tokens = 2000

        model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
        if model_context_tokens:
            model_instance = ModelInstance(
                provider_model_bundle=model_config.provider_model_bundle,
                model=model_config.model
            )

            curr_message_tokens = model_instance.get_llm_num_tokens(
                prompt_messages
            )

            max_tokens = 0
            for parameter_rule in model_config.model_schema.parameter_rules:
                if (parameter_rule.name == 'max_tokens'
                        or (parameter_rule.use_template and parameter_rule.use_template == 'max_tokens')):
                    max_tokens = (model_config.parameters.get(parameter_rule.name)
                                  or model_config.parameters.get(parameter_rule.use_template)) or 0

            rest_tokens = model_context_tokens - max_tokens - curr_message_tokens
            rest_tokens = max(rest_tokens, 0)

        return rest_tokens

    def _get_history_messages_from_memory(self, memory: TokenBufferMemory,
                                          memory_config: MemoryConfig,
                                          max_token_limit: int,
                                          human_prefix: Optional[str] = None,
                                          ai_prefix: Optional[str] = None) -> str:
        """Get memory messages."""
        kwargs = {
            "max_token_limit": max_token_limit
        }

        if human_prefix:
            kwargs['human_prefix'] = human_prefix

        if ai_prefix:
            kwargs['ai_prefix'] = ai_prefix

        if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0:
            kwargs['message_limit'] = memory_config.window.size

        return memory.get_history_prompt_text(
            **kwargs
        )

    def _get_history_messages_list_from_memory(self, memory: TokenBufferMemory,
                                               memory_config: MemoryConfig,
                                               max_token_limit: int) -> list[PromptMessage]:
        """Get memory messages."""
        return memory.get_history_prompt_messages(
            max_token_limit=max_token_limit,
            message_limit=memory_config.window.size
            if (memory_config.window.enabled
                and memory_config.window.size is not None
                and memory_config.window.size > 0)
            else None
        )