123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728 |
- import json
- import logging
- import os
- import queue
- import threading
- import time
- from datetime import timedelta
- from enum import Enum
- from typing import Any, Optional, Union
- from uuid import UUID
- from flask import current_app
- from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
- from core.ops.entities.config_entity import (
- LangfuseConfig,
- LangSmithConfig,
- TracingProviderEnum,
- )
- from core.ops.entities.trace_entity import (
- DatasetRetrievalTraceInfo,
- GenerateNameTraceInfo,
- MessageTraceInfo,
- ModerationTraceInfo,
- SuggestedQuestionTraceInfo,
- ToolTraceInfo,
- WorkflowTraceInfo,
- )
- from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
- from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
- from core.ops.utils import get_message_data
- from extensions.ext_database import db
- from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
- from models.workflow import WorkflowAppLog, WorkflowRun
- from tasks.ops_trace_task import process_trace_tasks
- provider_config_map = {
- TracingProviderEnum.LANGFUSE.value: {
- 'config_class': LangfuseConfig,
- 'secret_keys': ['public_key', 'secret_key'],
- 'other_keys': ['host'],
- 'trace_instance': LangFuseDataTrace
- },
- TracingProviderEnum.LANGSMITH.value: {
- 'config_class': LangSmithConfig,
- 'secret_keys': ['api_key'],
- 'other_keys': ['project', 'endpoint'],
- 'trace_instance': LangSmithDataTrace
- }
- }
- class OpsTraceManager:
- @classmethod
- def encrypt_tracing_config(
- cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
- ):
- """
- Encrypt tracing config.
- :param tenant_id: tenant id
- :param tracing_provider: tracing provider
- :param tracing_config: tracing config dictionary to be encrypted
- :param current_trace_config: current tracing configuration for keeping existing values
- :return: encrypted tracing configuration
- """
- # Get the configuration class and the keys that require encryption
- config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
- provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
- new_config = {}
- # Encrypt necessary keys
- for key in secret_keys:
- if key in tracing_config:
- if '*' in tracing_config[key]:
- # If the key contains '*', retain the original value from the current config
- new_config[key] = current_trace_config.get(key, tracing_config[key])
- else:
- # Otherwise, encrypt the key
- new_config[key] = encrypt_token(tenant_id, tracing_config[key])
- for key in other_keys:
- new_config[key] = tracing_config.get(key, "")
- # Create a new instance of the config class with the new configuration
- encrypted_config = config_class(**new_config)
- return encrypted_config.model_dump()
- @classmethod
- def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
- """
- Decrypt tracing config
- :param tenant_id: tenant id
- :param tracing_provider: tracing provider
- :param tracing_config: tracing config
- :return:
- """
- config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
- provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
- new_config = {}
- for key in secret_keys:
- if key in tracing_config:
- new_config[key] = decrypt_token(tenant_id, tracing_config[key])
- for key in other_keys:
- new_config[key] = tracing_config.get(key, "")
- return config_class(**new_config).model_dump()
- @classmethod
- def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
- """
- Decrypt tracing config
- :param tracing_provider: tracing provider
- :param decrypt_tracing_config: tracing config
- :return:
- """
- config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
- provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
- new_config = {}
- for key in secret_keys:
- if key in decrypt_tracing_config:
- new_config[key] = obfuscated_token(decrypt_tracing_config[key])
- for key in other_keys:
- new_config[key] = decrypt_tracing_config.get(key, "")
- return config_class(**new_config).model_dump()
- @classmethod
- def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
- """
- Get decrypted tracing config
- :param app_id: app id
- :param tracing_provider: tracing provider
- :return:
- """
- trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter(
- TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider
- ).first()
- if not trace_config_data:
- return None
- # decrypt_token
- tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id
- decrypt_tracing_config = cls.decrypt_tracing_config(
- tenant_id, tracing_provider, trace_config_data.tracing_config
- )
- return decrypt_tracing_config
- @classmethod
- def get_ops_trace_instance(
- cls,
- app_id: Optional[Union[UUID, str]] = None,
- message_id: Optional[str] = None,
- conversation_id: Optional[str] = None
- ):
- """
- Get ops trace through model config
- :param app_id: app_id
- :param message_id: message_id
- :param conversation_id: conversation_id
- :return:
- """
- if conversation_id is not None:
- conversation_data: Conversation = db.session.query(Conversation).filter(
- Conversation.id == conversation_id
- ).first()
- if conversation_data:
- app_id = conversation_data.app_id
- if message_id is not None:
- record: Message = db.session.query(Message).filter(Message.id == message_id).first()
- app_id = record.app_id
- if isinstance(app_id, UUID):
- app_id = str(app_id)
- if app_id is None:
- return None
- app: App = db.session.query(App).filter(
- App.id == app_id
- ).first()
- app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
- if app_ops_trace_config is not None:
- tracing_provider = app_ops_trace_config.get('tracing_provider')
- else:
- return None
- # decrypt_token
- decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
- if app_ops_trace_config.get('enabled'):
- trace_instance, config_class = provider_config_map[tracing_provider]['trace_instance'], \
- provider_config_map[tracing_provider]['config_class']
- tracing_instance = trace_instance(config_class(**decrypt_trace_config))
- return tracing_instance
- return None
- @classmethod
- def get_app_config_through_message_id(cls, message_id: str):
- app_model_config = None
- message_data = db.session.query(Message).filter(Message.id == message_id).first()
- conversation_id = message_data.conversation_id
- conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
- if conversation_data.app_model_config_id:
- app_model_config = db.session.query(AppModelConfig).filter(
- AppModelConfig.id == conversation_data.app_model_config_id
- ).first()
- elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
- app_model_config = conversation_data.override_model_configs
- return app_model_config
- @classmethod
- def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
- """
- Update app tracing config
- :param app_id: app id
- :param enabled: enabled
- :param tracing_provider: tracing provider
- :return:
- """
- # auth check
- if tracing_provider not in provider_config_map.keys() and tracing_provider is not None:
- raise ValueError(f"Invalid tracing provider: {tracing_provider}")
- app_config: App = db.session.query(App).filter(App.id == app_id).first()
- app_config.tracing = json.dumps(
- {
- "enabled": enabled,
- "tracing_provider": tracing_provider,
- }
- )
- db.session.commit()
- @classmethod
- def get_app_tracing_config(cls, app_id: str):
- """
- Get app tracing config
- :param app_id: app id
- :return:
- """
- app: App = db.session.query(App).filter(App.id == app_id).first()
- if not app.tracing:
- return {
- "enabled": False,
- "tracing_provider": None
- }
- app_trace_config = json.loads(app.tracing)
- return app_trace_config
- @staticmethod
- def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
- """
- Check trace config is effective
- :param tracing_config: tracing config
- :param tracing_provider: tracing provider
- :return:
- """
- config_type, trace_instance = provider_config_map[tracing_provider]['config_class'], \
- provider_config_map[tracing_provider]['trace_instance']
- tracing_config = config_type(**tracing_config)
- return trace_instance(tracing_config).api_check()
- class TraceTaskName(str, Enum):
- CONVERSATION_TRACE = 'conversation_trace'
- WORKFLOW_TRACE = 'workflow_trace'
- MESSAGE_TRACE = 'message_trace'
- MODERATION_TRACE = 'moderation_trace'
- SUGGESTED_QUESTION_TRACE = 'suggested_question_trace'
- DATASET_RETRIEVAL_TRACE = 'dataset_retrieval_trace'
- TOOL_TRACE = 'tool_trace'
- GENERATE_NAME_TRACE = 'generate_name_trace'
- class TraceTask:
- def __init__(
- self,
- trace_type: Any,
- message_id: Optional[str] = None,
- workflow_run: Optional[WorkflowRun] = None,
- conversation_id: Optional[str] = None,
- timer: Optional[Any] = None,
- **kwargs
- ):
- self.trace_type = trace_type
- self.message_id = message_id
- self.workflow_run = workflow_run
- self.conversation_id = conversation_id
- self.timer = timer
- self.kwargs = kwargs
- self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
- def execute(self):
- method_name, trace_info = self.preprocess()
- return trace_info
- def preprocess(self):
- if self.trace_type == TraceTaskName.CONVERSATION_TRACE:
- return TraceTaskName.CONVERSATION_TRACE, self.conversation_trace(**self.kwargs)
- if self.trace_type == TraceTaskName.WORKFLOW_TRACE:
- return TraceTaskName.WORKFLOW_TRACE, self.workflow_trace(self.workflow_run, self.conversation_id)
- elif self.trace_type == TraceTaskName.MESSAGE_TRACE:
- return TraceTaskName.MESSAGE_TRACE, self.message_trace(self.message_id)
- elif self.trace_type == TraceTaskName.MODERATION_TRACE:
- return TraceTaskName.MODERATION_TRACE, self.moderation_trace(self.message_id, self.timer, **self.kwargs)
- elif self.trace_type == TraceTaskName.SUGGESTED_QUESTION_TRACE:
- return TraceTaskName.SUGGESTED_QUESTION_TRACE, self.suggested_question_trace(
- self.message_id, self.timer, **self.kwargs
- )
- elif self.trace_type == TraceTaskName.DATASET_RETRIEVAL_TRACE:
- return TraceTaskName.DATASET_RETRIEVAL_TRACE, self.dataset_retrieval_trace(
- self.message_id, self.timer, **self.kwargs
- )
- elif self.trace_type == TraceTaskName.TOOL_TRACE:
- return TraceTaskName.TOOL_TRACE, self.tool_trace(self.message_id, self.timer, **self.kwargs)
- elif self.trace_type == TraceTaskName.GENERATE_NAME_TRACE:
- return TraceTaskName.GENERATE_NAME_TRACE, self.generate_name_trace(
- self.conversation_id, self.timer, **self.kwargs
- )
- else:
- return '', {}
- # process methods for different trace types
- def conversation_trace(self, **kwargs):
- return kwargs
- def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
- workflow_id = workflow_run.workflow_id
- tenant_id = workflow_run.tenant_id
- workflow_run_id = workflow_run.id
- workflow_run_elapsed_time = workflow_run.elapsed_time
- workflow_run_status = workflow_run.status
- workflow_run_inputs = (
- json.loads(workflow_run.inputs) if workflow_run.inputs else {}
- )
- workflow_run_outputs = (
- json.loads(workflow_run.outputs) if workflow_run.outputs else {}
- )
- workflow_run_version = workflow_run.version
- error = workflow_run.error if workflow_run.error else ""
- total_tokens = workflow_run.total_tokens
- file_list = workflow_run_inputs.get("sys.file") if workflow_run_inputs.get("sys.file") else []
- query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
- # get workflow_app_log_id
- workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(workflow_run_id=workflow_run.id).first()
- workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
- # get message_id
- message_data = db.session.query(Message.id).filter_by(workflow_run_id=workflow_run_id).first()
- message_id = str(message_data.id) if message_data else None
- metadata = {
- "workflow_id": workflow_id,
- "conversation_id": conversation_id,
- "workflow_run_id": workflow_run_id,
- "tenant_id": tenant_id,
- "elapsed_time": workflow_run_elapsed_time,
- "status": workflow_run_status,
- "version": workflow_run_version,
- "total_tokens": total_tokens,
- "file_list": file_list,
- "triggered_form": workflow_run.triggered_from,
- }
- workflow_trace_info = WorkflowTraceInfo(
- workflow_data=workflow_run.to_dict(),
- conversation_id=conversation_id,
- workflow_id=workflow_id,
- tenant_id=tenant_id,
- workflow_run_id=workflow_run_id,
- workflow_run_elapsed_time=workflow_run_elapsed_time,
- workflow_run_status=workflow_run_status,
- workflow_run_inputs=workflow_run_inputs,
- workflow_run_outputs=workflow_run_outputs,
- workflow_run_version=workflow_run_version,
- error=error,
- total_tokens=total_tokens,
- file_list=file_list,
- query=query,
- metadata=metadata,
- workflow_app_log_id=workflow_app_log_id,
- message_id=message_id,
- start_time=workflow_run.created_at,
- end_time=workflow_run.finished_at,
- )
- return workflow_trace_info
- def message_trace(self, message_id):
- message_data = get_message_data(message_id)
- if not message_data:
- return {}
- conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
- conversation_mode = conversation_mode[0]
- created_at = message_data.created_at
- inputs = message_data.message
- # get message file data
- message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
- file_list = []
- if message_file_data and message_file_data.url is not None:
- file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
- file_list.append(file_url)
- metadata = {
- "conversation_id": message_data.conversation_id,
- "ls_provider": message_data.model_provider,
- "ls_model_name": message_data.model_id,
- "status": message_data.status,
- "from_end_user_id": message_data.from_account_id,
- "from_account_id": message_data.from_account_id,
- "agent_based": message_data.agent_based,
- "workflow_run_id": message_data.workflow_run_id,
- "from_source": message_data.from_source,
- "message_id": message_id,
- }
- message_tokens = message_data.message_tokens
- message_trace_info = MessageTraceInfo(
- message_id=message_id,
- message_data=message_data.to_dict(),
- conversation_model=conversation_mode,
- message_tokens=message_tokens,
- answer_tokens=message_data.answer_tokens,
- total_tokens=message_tokens + message_data.answer_tokens,
- error=message_data.error if message_data.error else "",
- inputs=inputs,
- outputs=message_data.answer,
- file_list=file_list,
- start_time=created_at,
- end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
- metadata=metadata,
- message_file_data=message_file_data,
- conversation_mode=conversation_mode,
- )
- return message_trace_info
- def moderation_trace(self, message_id, timer, **kwargs):
- moderation_result = kwargs.get("moderation_result")
- inputs = kwargs.get("inputs")
- message_data = get_message_data(message_id)
- if not message_data:
- return {}
- metadata = {
- "message_id": message_id,
- "action": moderation_result.action,
- "preset_response": moderation_result.preset_response,
- "query": moderation_result.query,
- }
- # get workflow_app_log_id
- workflow_app_log_id = None
- if message_data.workflow_run_id:
- workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
- workflow_run_id=message_data.workflow_run_id
- ).first()
- workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
- moderation_trace_info = ModerationTraceInfo(
- message_id=workflow_app_log_id if workflow_app_log_id else message_id,
- inputs=inputs,
- message_data=message_data.to_dict(),
- flagged=moderation_result.flagged,
- action=moderation_result.action,
- preset_response=moderation_result.preset_response,
- query=moderation_result.query,
- start_time=timer.get("start"),
- end_time=timer.get("end"),
- metadata=metadata,
- )
- return moderation_trace_info
- def suggested_question_trace(self, message_id, timer, **kwargs):
- suggested_question = kwargs.get("suggested_question")
- message_data = get_message_data(message_id)
- if not message_data:
- return {}
- metadata = {
- "message_id": message_id,
- "ls_provider": message_data.model_provider,
- "ls_model_name": message_data.model_id,
- "status": message_data.status,
- "from_end_user_id": message_data.from_account_id,
- "from_account_id": message_data.from_account_id,
- "agent_based": message_data.agent_based,
- "workflow_run_id": message_data.workflow_run_id,
- "from_source": message_data.from_source,
- }
- # get workflow_app_log_id
- workflow_app_log_id = None
- if message_data.workflow_run_id:
- workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
- workflow_run_id=message_data.workflow_run_id
- ).first()
- workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
- suggested_question_trace_info = SuggestedQuestionTraceInfo(
- message_id=workflow_app_log_id if workflow_app_log_id else message_id,
- message_data=message_data.to_dict(),
- inputs=message_data.message,
- outputs=message_data.answer,
- start_time=timer.get("start"),
- end_time=timer.get("end"),
- metadata=metadata,
- total_tokens=message_data.message_tokens + message_data.answer_tokens,
- status=message_data.status,
- error=message_data.error,
- from_account_id=message_data.from_account_id,
- agent_based=message_data.agent_based,
- from_source=message_data.from_source,
- model_provider=message_data.model_provider,
- model_id=message_data.model_id,
- suggested_question=suggested_question,
- level=message_data.status,
- status_message=message_data.error,
- )
- return suggested_question_trace_info
- def dataset_retrieval_trace(self, message_id, timer, **kwargs):
- documents = kwargs.get("documents")
- message_data = get_message_data(message_id)
- if not message_data:
- return {}
- metadata = {
- "message_id": message_id,
- "ls_provider": message_data.model_provider,
- "ls_model_name": message_data.model_id,
- "status": message_data.status,
- "from_end_user_id": message_data.from_account_id,
- "from_account_id": message_data.from_account_id,
- "agent_based": message_data.agent_based,
- "workflow_run_id": message_data.workflow_run_id,
- "from_source": message_data.from_source,
- }
- dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
- message_id=message_id,
- inputs=message_data.query if message_data.query else message_data.inputs,
- documents=[doc.model_dump() for doc in documents],
- start_time=timer.get("start"),
- end_time=timer.get("end"),
- metadata=metadata,
- message_data=message_data.to_dict(),
- )
- return dataset_retrieval_trace_info
- def tool_trace(self, message_id, timer, **kwargs):
- tool_name = kwargs.get('tool_name')
- tool_inputs = kwargs.get('tool_inputs')
- tool_outputs = kwargs.get('tool_outputs')
- message_data = get_message_data(message_id)
- if not message_data:
- return {}
- tool_config = {}
- time_cost = 0
- error = None
- tool_parameters = {}
- created_time = message_data.created_at
- end_time = message_data.updated_at
- agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts
- for agent_thought in agent_thoughts:
- if tool_name in agent_thought.tools:
- created_time = agent_thought.created_at
- tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
- tool_config = tool_meta_data.get('tool_config', {})
- time_cost = tool_meta_data.get('time_cost', 0)
- end_time = created_time + timedelta(seconds=time_cost)
- error = tool_meta_data.get('error', "")
- tool_parameters = tool_meta_data.get('tool_parameters', {})
- metadata = {
- "message_id": message_id,
- "tool_name": tool_name,
- "tool_inputs": tool_inputs,
- "tool_outputs": tool_outputs,
- "tool_config": tool_config,
- "time_cost": time_cost,
- "error": error,
- "tool_parameters": tool_parameters,
- }
- file_url = ""
- message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
- if message_file_data:
- message_file_id = message_file_data.id if message_file_data else None
- type = message_file_data.type
- created_by_role = message_file_data.created_by_role
- created_user_id = message_file_data.created_by
- file_url = f"{self.file_base_url}/{message_file_data.url}"
- metadata.update(
- {
- "message_file_id": message_file_id,
- "created_by_role": created_by_role,
- "created_user_id": created_user_id,
- "type": type,
- }
- )
- tool_trace_info = ToolTraceInfo(
- message_id=message_id,
- message_data=message_data.to_dict(),
- tool_name=tool_name,
- start_time=timer.get("start") if timer else created_time,
- end_time=timer.get("end") if timer else end_time,
- tool_inputs=tool_inputs,
- tool_outputs=tool_outputs,
- metadata=metadata,
- message_file_data=message_file_data,
- error=error,
- inputs=message_data.message,
- outputs=message_data.answer,
- tool_config=tool_config,
- time_cost=time_cost,
- tool_parameters=tool_parameters,
- file_url=file_url,
- )
- return tool_trace_info
- def generate_name_trace(self, conversation_id, timer, **kwargs):
- generate_conversation_name = kwargs.get("generate_conversation_name")
- inputs = kwargs.get("inputs")
- tenant_id = kwargs.get("tenant_id")
- start_time = timer.get("start")
- end_time = timer.get("end")
- metadata = {
- "conversation_id": conversation_id,
- "tenant_id": tenant_id,
- }
- generate_name_trace_info = GenerateNameTraceInfo(
- conversation_id=conversation_id,
- inputs=inputs,
- outputs=generate_conversation_name,
- start_time=start_time,
- end_time=end_time,
- metadata=metadata,
- tenant_id=tenant_id,
- )
- return generate_name_trace_info
- trace_manager_timer = None
- trace_manager_queue = queue.Queue()
- trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 1))
- trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
- class TraceQueueManager:
- def __init__(self, app_id=None, conversation_id=None, message_id=None):
- global trace_manager_timer
- self.app_id = app_id
- self.conversation_id = conversation_id
- self.message_id = message_id
- self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
- self.flask_app = current_app._get_current_object()
- if trace_manager_timer is None:
- self.start_timer()
- def add_trace_task(self, trace_task: TraceTask):
- global trace_manager_timer
- global trace_manager_queue
- try:
- if self.trace_instance:
- trace_manager_queue.put(trace_task)
- except Exception as e:
- logging.debug(f"Error adding trace task: {e}")
- finally:
- self.start_timer()
- def collect_tasks(self):
- global trace_manager_queue
- tasks = []
- while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
- task = trace_manager_queue.get_nowait()
- tasks.append(task)
- trace_manager_queue.task_done()
- return tasks
- def run(self):
- try:
- tasks = self.collect_tasks()
- if tasks:
- self.send_to_celery(tasks)
- except Exception as e:
- logging.debug(f"Error processing trace tasks: {e}")
- def start_timer(self):
- global trace_manager_timer
- if trace_manager_timer is None or not trace_manager_timer.is_alive():
- trace_manager_timer = threading.Timer(
- trace_manager_interval, self.run
- )
- trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
- trace_manager_timer.daemon = False
- trace_manager_timer.start()
- def send_to_celery(self, tasks: list[TraceTask]):
- with self.flask_app.app_context():
- for task in tasks:
- trace_info = task.execute()
- task_data = {
- "app_id": self.app_id,
- "conversation_id": self.conversation_id,
- "message_id": self.message_id,
- "trace_info_type": type(trace_info).__name__,
- "trace_info": trace_info.model_dump() if trace_info else {},
- }
- process_trace_tasks.delay(task_data)
|