123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- import logging
- from collections.abc import Callable, Generator, Iterable, Sequence
- from typing import IO, Any, Optional, Union, cast
- from configs import dify_config
- from core.entities.embedding_type import EmbeddingInputType
- from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
- from core.entities.provider_entities import ModelLoadBalancingConfiguration
- from core.errors.error import ProviderTokenNotInitError
- from core.model_runtime.callbacks.base_callback import Callback
- from core.model_runtime.entities.llm_entities import LLMResult
- from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
- from core.model_runtime.entities.model_entities import ModelType
- from core.model_runtime.entities.rerank_entities import RerankResult
- from core.model_runtime.entities.text_embedding_entities import TextEmbeddingResult
- from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeConnectionError, InvokeRateLimitError
- from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
- from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
- from core.model_runtime.model_providers.__base.rerank_model import RerankModel
- from core.model_runtime.model_providers.__base.speech2text_model import Speech2TextModel
- from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
- from core.model_runtime.model_providers.__base.tts_model import TTSModel
- from core.provider_manager import ProviderManager
- from extensions.ext_redis import redis_client
- from models.provider import ProviderType
- logger = logging.getLogger(__name__)
- class ModelInstance:
- """
- Model instance class
- """
- def __init__(self, provider_model_bundle: ProviderModelBundle, model: str) -> None:
- self.provider_model_bundle = provider_model_bundle
- self.model = model
- self.provider = provider_model_bundle.configuration.provider.provider
- self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
- self.model_type_instance = self.provider_model_bundle.model_type_instance
- self.load_balancing_manager = self._get_load_balancing_manager(
- configuration=provider_model_bundle.configuration,
- model_type=provider_model_bundle.model_type_instance.model_type,
- model=model,
- credentials=self.credentials,
- )
- @staticmethod
- def _fetch_credentials_from_bundle(provider_model_bundle: ProviderModelBundle, model: str) -> dict:
- """
- Fetch credentials from provider model bundle
- :param provider_model_bundle: provider model bundle
- :param model: model name
- :return:
- """
- configuration = provider_model_bundle.configuration
- model_type = provider_model_bundle.model_type_instance.model_type
- credentials = configuration.get_current_credentials(model_type=model_type, model=model)
- if credentials is None:
- raise ProviderTokenNotInitError(f"Model {model} credentials is not initialized.")
- return credentials
- @staticmethod
- def _get_load_balancing_manager(
- configuration: ProviderConfiguration, model_type: ModelType, model: str, credentials: dict
- ) -> Optional["LBModelManager"]:
- """
- Get load balancing model credentials
- :param configuration: provider configuration
- :param model_type: model type
- :param model: model name
- :param credentials: model credentials
- :return:
- """
- if configuration.model_settings and configuration.using_provider_type == ProviderType.CUSTOM:
- current_model_setting = None
- # check if model is disabled by admin
- for model_setting in configuration.model_settings:
- if model_setting.model_type == model_type and model_setting.model == model:
- current_model_setting = model_setting
- break
- # check if load balancing is enabled
- if current_model_setting and current_model_setting.load_balancing_configs:
- # use load balancing proxy to choose credentials
- lb_model_manager = LBModelManager(
- tenant_id=configuration.tenant_id,
- provider=configuration.provider.provider,
- model_type=model_type,
- model=model,
- load_balancing_configs=current_model_setting.load_balancing_configs,
- managed_credentials=credentials if configuration.custom_configuration.provider else None,
- )
- return lb_model_manager
- return None
- def invoke_llm(
- self,
- prompt_messages: list[PromptMessage],
- model_parameters: Optional[dict] = None,
- tools: Sequence[PromptMessageTool] | None = None,
- stop: Optional[list[str]] = None,
- stream: bool = True,
- user: Optional[str] = None,
- callbacks: Optional[list[Callback]] = None,
- ) -> Union[LLMResult, Generator]:
- """
- Invoke large language model
- :param prompt_messages: prompt messages
- :param model_parameters: model parameters
- :param tools: tools for tool calling
- :param stop: stop words
- :param stream: is stream response
- :param user: unique user id
- :param callbacks: callbacks
- :return: full response or stream response chunk generator result
- """
- if not isinstance(self.model_type_instance, LargeLanguageModel):
- raise Exception("Model type instance is not LargeLanguageModel")
- self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- prompt_messages=prompt_messages,
- model_parameters=model_parameters,
- tools=tools,
- stop=stop,
- stream=stream,
- user=user,
- callbacks=callbacks,
- )
- def get_llm_num_tokens(
- self, prompt_messages: list[PromptMessage], tools: Optional[list[PromptMessageTool]] = None
- ) -> int:
- """
- Get number of tokens for llm
- :param prompt_messages: prompt messages
- :param tools: tools for tool calling
- :return:
- """
- if not isinstance(self.model_type_instance, LargeLanguageModel):
- raise Exception("Model type instance is not LargeLanguageModel")
- self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.get_num_tokens,
- model=self.model,
- credentials=self.credentials,
- prompt_messages=prompt_messages,
- tools=tools,
- )
- def invoke_text_embedding(
- self, texts: list[str], user: Optional[str] = None, input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT
- ) -> TextEmbeddingResult:
- """
- Invoke large language model
- :param texts: texts to embed
- :param user: unique user id
- :param input_type: input type
- :return: embeddings result
- """
- if not isinstance(self.model_type_instance, TextEmbeddingModel):
- raise Exception("Model type instance is not TextEmbeddingModel")
- self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- texts=texts,
- user=user,
- input_type=input_type,
- )
- def get_text_embedding_num_tokens(self, texts: list[str]) -> int:
- """
- Get number of tokens for text embedding
- :param texts: texts to embed
- :return:
- """
- if not isinstance(self.model_type_instance, TextEmbeddingModel):
- raise Exception("Model type instance is not TextEmbeddingModel")
- self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.get_num_tokens,
- model=self.model,
- credentials=self.credentials,
- texts=texts,
- )
- def invoke_rerank(
- self,
- query: str,
- docs: list[str],
- score_threshold: Optional[float] = None,
- top_n: Optional[int] = None,
- user: Optional[str] = None,
- ) -> RerankResult:
- """
- Invoke rerank model
- :param query: search query
- :param docs: docs for reranking
- :param score_threshold: score threshold
- :param top_n: top n
- :param user: unique user id
- :return: rerank result
- """
- if not isinstance(self.model_type_instance, RerankModel):
- raise Exception("Model type instance is not RerankModel")
- self.model_type_instance = cast(RerankModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- query=query,
- docs=docs,
- score_threshold=score_threshold,
- top_n=top_n,
- user=user,
- )
- def invoke_moderation(self, text: str, user: Optional[str] = None) -> bool:
- """
- Invoke moderation model
- :param text: text to moderate
- :param user: unique user id
- :return: false if text is safe, true otherwise
- """
- if not isinstance(self.model_type_instance, ModerationModel):
- raise Exception("Model type instance is not ModerationModel")
- self.model_type_instance = cast(ModerationModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- text=text,
- user=user,
- )
- def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:
- """
- Invoke large language model
- :param file: audio file
- :param user: unique user id
- :return: text for given audio file
- """
- if not isinstance(self.model_type_instance, Speech2TextModel):
- raise Exception("Model type instance is not Speech2TextModel")
- self.model_type_instance = cast(Speech2TextModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- file=file,
- user=user,
- )
- def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Optional[str] = None) -> Iterable[bytes]:
- """
- Invoke large language tts model
- :param content_text: text content to be translated
- :param tenant_id: user tenant id
- :param voice: model timbre
- :param user: unique user id
- :return: text for given audio file
- """
- if not isinstance(self.model_type_instance, TTSModel):
- raise Exception("Model type instance is not TTSModel")
- self.model_type_instance = cast(TTSModel, self.model_type_instance)
- return self._round_robin_invoke(
- function=self.model_type_instance.invoke,
- model=self.model,
- credentials=self.credentials,
- content_text=content_text,
- user=user,
- tenant_id=tenant_id,
- voice=voice,
- )
- def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
- """
- Round-robin invoke
- :param function: function to invoke
- :param args: function args
- :param kwargs: function kwargs
- :return:
- """
- if not self.load_balancing_manager:
- return function(*args, **kwargs)
- last_exception = None
- while True:
- lb_config = self.load_balancing_manager.fetch_next()
- if not lb_config:
- if not last_exception:
- raise ProviderTokenNotInitError("Model credentials is not initialized.")
- else:
- raise last_exception
- try:
- if "credentials" in kwargs:
- del kwargs["credentials"]
- return function(*args, **kwargs, credentials=lb_config.credentials)
- except InvokeRateLimitError as e:
- # expire in 60 seconds
- self.load_balancing_manager.cooldown(lb_config, expire=60)
- last_exception = e
- continue
- except (InvokeAuthorizationError, InvokeConnectionError) as e:
- # expire in 10 seconds
- self.load_balancing_manager.cooldown(lb_config, expire=10)
- last_exception = e
- continue
- except Exception as e:
- raise e
- def get_tts_voices(self, language: Optional[str] = None) -> list:
- """
- Invoke large language tts model voices
- :param language: tts language
- :return: tts model voices
- """
- if not isinstance(self.model_type_instance, TTSModel):
- raise Exception("Model type instance is not TTSModel")
- self.model_type_instance = cast(TTSModel, self.model_type_instance)
- return self.model_type_instance.get_tts_model_voices(
- model=self.model, credentials=self.credentials, language=language
- )
- class ModelManager:
- def __init__(self) -> None:
- self._provider_manager = ProviderManager()
- def get_model_instance(self, tenant_id: str, provider: str, model_type: ModelType, model: str) -> ModelInstance:
- """
- Get model instance
- :param tenant_id: tenant id
- :param provider: provider name
- :param model_type: model type
- :param model: model name
- :return:
- """
- if not provider:
- return self.get_default_model_instance(tenant_id, model_type)
- provider_model_bundle = self._provider_manager.get_provider_model_bundle(
- tenant_id=tenant_id, provider=provider, model_type=model_type
- )
- return ModelInstance(provider_model_bundle, model)
- def get_default_provider_model_name(self, tenant_id: str, model_type: ModelType) -> tuple[str, str]:
- """
- Return first provider and the first model in the provider
- :param tenant_id: tenant id
- :param model_type: model type
- :return: provider name, model name
- """
- return self._provider_manager.get_first_provider_first_model(tenant_id, model_type)
- def get_default_model_instance(self, tenant_id: str, model_type: ModelType) -> ModelInstance:
- """
- Get default model instance
- :param tenant_id: tenant id
- :param model_type: model type
- :return:
- """
- default_model_entity = self._provider_manager.get_default_model(tenant_id=tenant_id, model_type=model_type)
- if not default_model_entity:
- raise ProviderTokenNotInitError(f"Default model not found for {model_type}")
- return self.get_model_instance(
- tenant_id=tenant_id,
- provider=default_model_entity.provider.provider,
- model_type=model_type,
- model=default_model_entity.model,
- )
- class LBModelManager:
- def __init__(
- self,
- tenant_id: str,
- provider: str,
- model_type: ModelType,
- model: str,
- load_balancing_configs: list[ModelLoadBalancingConfiguration],
- managed_credentials: Optional[dict] = None,
- ) -> None:
- """
- Load balancing model manager
- :param tenant_id: tenant_id
- :param provider: provider
- :param model_type: model_type
- :param model: model name
- :param load_balancing_configs: all load balancing configurations
- :param managed_credentials: credentials if load balancing configuration name is __inherit__
- """
- self._tenant_id = tenant_id
- self._provider = provider
- self._model_type = model_type
- self._model = model
- self._load_balancing_configs = load_balancing_configs
- for load_balancing_config in self._load_balancing_configs[:]: # Iterate over a shallow copy of the list
- if load_balancing_config.name == "__inherit__":
- if not managed_credentials:
- # remove __inherit__ if managed credentials is not provided
- self._load_balancing_configs.remove(load_balancing_config)
- else:
- load_balancing_config.credentials = managed_credentials
- def fetch_next(self) -> Optional[ModelLoadBalancingConfiguration]:
- """
- Get next model load balancing config
- Strategy: Round Robin
- :return:
- """
- cache_key = "model_lb_index:{}:{}:{}:{}".format(
- self._tenant_id, self._provider, self._model_type.value, self._model
- )
- cooldown_load_balancing_configs = []
- max_index = len(self._load_balancing_configs)
- while True:
- current_index = redis_client.incr(cache_key)
- current_index = cast(int, current_index)
- if current_index >= 10000000:
- current_index = 1
- redis_client.set(cache_key, current_index)
- redis_client.expire(cache_key, 3600)
- if current_index > max_index:
- current_index = current_index % max_index
- real_index = current_index - 1
- if real_index > max_index:
- real_index = 0
- config = self._load_balancing_configs[real_index]
- if self.in_cooldown(config):
- cooldown_load_balancing_configs.append(config)
- if len(cooldown_load_balancing_configs) >= len(self._load_balancing_configs):
- # all configs are in cooldown
- return None
- continue
- if dify_config.DEBUG:
- logger.info(
- f"Model LB\nid: {config.id}\nname:{config.name}\n"
- f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"
- f"model_type: {self._model_type.value}\nmodel: {self._model}"
- )
- return config
- return None
- def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:
- """
- Cooldown model load balancing config
- :param config: model load balancing config
- :param expire: cooldown time
- :return:
- """
- cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
- self._tenant_id, self._provider, self._model_type.value, self._model, config.id
- )
- redis_client.setex(cooldown_cache_key, expire, "true")
- def in_cooldown(self, config: ModelLoadBalancingConfiguration) -> bool:
- """
- Check if model load balancing config is in cooldown
- :param config: model load balancing config
- :return:
- """
- cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
- self._tenant_id, self._provider, self._model_type.value, self._model, config.id
- )
- res = redis_client.exists(cooldown_cache_key)
- res = cast(bool, res)
- return res
- @staticmethod
- def get_config_in_cooldown_and_ttl(
- tenant_id: str, provider: str, model_type: ModelType, model: str, config_id: str
- ) -> tuple[bool, int]:
- """
- Get model load balancing config is in cooldown and ttl
- :param tenant_id: workspace id
- :param provider: provider name
- :param model_type: model type
- :param model: model name
- :param config_id: model load balancing config id
- :return:
- """
- cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
- tenant_id, provider, model_type.value, model, config_id
- )
- ttl = redis_client.ttl(cooldown_cache_key)
- if ttl == -2:
- return False, 0
- ttl = cast(int, ttl)
- return True, ttl
|