| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 | import loggingfrom typing import Tuple, Optionalfrom core.app_runner.app_runner import AppRunnerfrom core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandlerfrom core.entities.application_entities import ApplicationGenerateEntity, ModelConfigEntity, \    AppOrchestrationConfigEntity, InvokeFrom, ExternalDataVariableEntity, DatasetEntityfrom core.application_queue_manager import ApplicationQueueManager, PublishFromfrom core.features.annotation_reply import AnnotationReplyFeaturefrom core.features.dataset_retrieval import DatasetRetrievalFeaturefrom core.features.external_data_fetch import ExternalDataFetchFeaturefrom core.features.hosting_moderation import HostingModerationFeaturefrom core.features.moderation import ModerationFeaturefrom core.memory.token_buffer_memory import TokenBufferMemoryfrom core.model_manager import ModelInstancefrom core.model_runtime.entities.message_entities import PromptMessagefrom core.moderation.base import ModerationExceptionfrom core.prompt.prompt_transform import AppModefrom extensions.ext_database import dbfrom models.model import Conversation, Message, App, MessageAnnotationlogger = logging.getLogger(__name__)class BasicApplicationRunner(AppRunner):    """    Basic Application Runner    """    def run(self, application_generate_entity: ApplicationGenerateEntity,            queue_manager: ApplicationQueueManager,            conversation: Conversation,            message: Message) -> None:        """        Run application        :param application_generate_entity: application generate entity        :param queue_manager: application queue manager        :param conversation: conversation        :param message: message        :return:        """        app_record = db.session.query(App).filter(App.id == application_generate_entity.app_id).first()        if not app_record:            raise ValueError(f"App not found")        app_orchestration_config = application_generate_entity.app_orchestration_config_entity        inputs = application_generate_entity.inputs        query = application_generate_entity.query        files = application_generate_entity.files                                                self.get_pre_calculate_rest_tokens(            app_record=app_record,            model_config=app_orchestration_config.model_config,            prompt_template_entity=app_orchestration_config.prompt_template,            inputs=inputs,            files=files,            query=query        )        memory = None        if application_generate_entity.conversation_id:                        model_instance = ModelInstance(                provider_model_bundle=app_orchestration_config.model_config.provider_model_bundle,                model=app_orchestration_config.model_config.model            )            memory = TokenBufferMemory(                conversation=conversation,                model_instance=model_instance            )                                prompt_messages, stop = self.organize_prompt_messages(            app_record=app_record,            model_config=app_orchestration_config.model_config,            prompt_template_entity=app_orchestration_config.prompt_template,            inputs=inputs,            files=files,            query=query,            memory=memory        )                try:                        _, inputs, query = self.moderation_for_inputs(                app_id=app_record.id,                tenant_id=application_generate_entity.tenant_id,                app_orchestration_config_entity=app_orchestration_config,                inputs=inputs,                query=query,            )        except ModerationException as e:            self.direct_output(                queue_manager=queue_manager,                app_orchestration_config=app_orchestration_config,                prompt_messages=prompt_messages,                text=str(e),                stream=application_generate_entity.stream            )            return        if query:                        annotation_reply = self.query_app_annotations_to_reply(                app_record=app_record,                message=message,                query=query,                user_id=application_generate_entity.user_id,                invoke_from=application_generate_entity.invoke_from            )            if annotation_reply:                queue_manager.publish_annotation_reply(                    message_annotation_id=annotation_reply.id,                    pub_from=PublishFrom.APPLICATION_MANAGER                )                self.direct_output(                    queue_manager=queue_manager,                    app_orchestration_config=app_orchestration_config,                    prompt_messages=prompt_messages,                    text=annotation_reply.content,                    stream=application_generate_entity.stream                )                return                external_data_tools = app_orchestration_config.external_data_variables        if external_data_tools:            inputs = self.fill_in_inputs_from_external_data_tools(                tenant_id=app_record.tenant_id,                app_id=app_record.id,                external_data_tools=external_data_tools,                inputs=inputs,                query=query            )                context = None        if app_orchestration_config.dataset:            context = self.retrieve_dataset_context(                tenant_id=app_record.tenant_id,                app_record=app_record,                queue_manager=queue_manager,                model_config=app_orchestration_config.model_config,                show_retrieve_source=app_orchestration_config.show_retrieve_source,                dataset_config=app_orchestration_config.dataset,                message=message,                inputs=inputs,                query=query,                user_id=application_generate_entity.user_id,                invoke_from=application_generate_entity.invoke_from,                memory=memory            )                                prompt_messages, stop = self.organize_prompt_messages(            app_record=app_record,            model_config=app_orchestration_config.model_config,            prompt_template_entity=app_orchestration_config.prompt_template,            inputs=inputs,            files=files,            query=query,            context=context,            memory=memory        )                hosting_moderation_result = self.check_hosting_moderation(            application_generate_entity=application_generate_entity,            queue_manager=queue_manager,            prompt_messages=prompt_messages        )        if hosting_moderation_result:            return                self.recale_llm_max_tokens(            model_config=app_orchestration_config.model_config,            prompt_messages=prompt_messages        )                model_instance = ModelInstance(            provider_model_bundle=app_orchestration_config.model_config.provider_model_bundle,            model=app_orchestration_config.model_config.model        )        invoke_result = model_instance.invoke_llm(            prompt_messages=prompt_messages,            model_parameters=app_orchestration_config.model_config.parameters,            stop=stop,            stream=application_generate_entity.stream,            user=application_generate_entity.user_id,        )                self._handle_invoke_result(            invoke_result=invoke_result,            queue_manager=queue_manager,            stream=application_generate_entity.stream        )    def moderation_for_inputs(self, app_id: str,                              tenant_id: str,                              app_orchestration_config_entity: AppOrchestrationConfigEntity,                              inputs: dict,                              query: str) -> Tuple[bool, dict, str]:        """        Process sensitive_word_avoidance.        :param app_id: app id        :param tenant_id: tenant id        :param app_orchestration_config_entity: app orchestration config entity        :param inputs: inputs        :param query: query        :return:        """        moderation_feature = ModerationFeature()        return moderation_feature.check(            app_id=app_id,            tenant_id=tenant_id,            app_orchestration_config_entity=app_orchestration_config_entity,            inputs=inputs,            query=query,        )    def query_app_annotations_to_reply(self, app_record: App,                                       message: Message,                                       query: str,                                       user_id: str,                                       invoke_from: InvokeFrom) -> Optional[MessageAnnotation]:        """        Query app annotations to reply        :param app_record: app record        :param message: message        :param query: query        :param user_id: user id        :param invoke_from: invoke from        :return:        """        annotation_reply_feature = AnnotationReplyFeature()        return annotation_reply_feature.query(            app_record=app_record,            message=message,            query=query,            user_id=user_id,            invoke_from=invoke_from        )    def fill_in_inputs_from_external_data_tools(self, tenant_id: str,                                                app_id: str,                                                external_data_tools: list[ExternalDataVariableEntity],                                                inputs: dict,                                                query: str) -> dict:        """        Fill in variable inputs from external data tools if exists.        :param tenant_id: workspace id        :param app_id: app id        :param external_data_tools: external data tools configs        :param inputs: the inputs        :param query: the query        :return: the filled inputs        """        external_data_fetch_feature = ExternalDataFetchFeature()        return external_data_fetch_feature.fetch(            tenant_id=tenant_id,            app_id=app_id,            external_data_tools=external_data_tools,            inputs=inputs,            query=query        )    def retrieve_dataset_context(self, tenant_id: str,                                 app_record: App,                                 queue_manager: ApplicationQueueManager,                                 model_config: ModelConfigEntity,                                 dataset_config: DatasetEntity,                                 show_retrieve_source: bool,                                 message: Message,                                 inputs: dict,                                 query: str,                                 user_id: str,                                 invoke_from: InvokeFrom,                                 memory: Optional[TokenBufferMemory] = None) -> Optional[str]:        """        Retrieve dataset context        :param tenant_id: tenant id        :param app_record: app record        :param queue_manager: queue manager        :param model_config: model config        :param dataset_config: dataset config        :param show_retrieve_source: show retrieve source        :param message: message        :param inputs: inputs        :param query: query        :param user_id: user id        :param invoke_from: invoke from        :param memory: memory        :return:        """        hit_callback = DatasetIndexToolCallbackHandler(            queue_manager,            app_record.id,            message.id,            user_id,            invoke_from        )        if (app_record.mode == AppMode.COMPLETION.value and dataset_config                and dataset_config.retrieve_config.query_variable):            query = inputs.get(dataset_config.retrieve_config.query_variable, "")        dataset_retrieval = DatasetRetrievalFeature()        return dataset_retrieval.retrieve(            tenant_id=tenant_id,            model_config=model_config,            config=dataset_config,            query=query,            invoke_from=invoke_from,            show_retrieve_source=show_retrieve_source,            hit_callback=hit_callback,            memory=memory        )    def check_hosting_moderation(self, application_generate_entity: ApplicationGenerateEntity,                                 queue_manager: ApplicationQueueManager,                                 prompt_messages: list[PromptMessage]) -> bool:        """        Check hosting moderation        :param application_generate_entity: application generate entity        :param queue_manager: queue manager        :param prompt_messages: prompt messages        :return:        """        hosting_moderation_feature = HostingModerationFeature()        moderation_result = hosting_moderation_feature.check(            application_generate_entity=application_generate_entity,            prompt_messages=prompt_messages        )        if moderation_result:            self.direct_output(                queue_manager=queue_manager,                app_orchestration_config=application_generate_entity.app_orchestration_config_entity,                prompt_messages=prompt_messages,                text="I apologize for any confusion, " \                     "but I'm an AI assistant to be helpful, harmless, and honest.",                stream=application_generate_entity.stream            )        return moderation_result
 |