123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- import json
- import logging
- import os
- from datetime import datetime, timedelta
- from typing import Optional
- from langfuse import Langfuse
- from core.ops.base_trace_instance import BaseTraceInstance
- from core.ops.entities.config_entity import LangfuseConfig
- from core.ops.entities.trace_entity import (
- BaseTraceInfo,
- DatasetRetrievalTraceInfo,
- GenerateNameTraceInfo,
- MessageTraceInfo,
- ModerationTraceInfo,
- SuggestedQuestionTraceInfo,
- ToolTraceInfo,
- WorkflowTraceInfo,
- )
- from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
- GenerationUsage,
- LangfuseGeneration,
- LangfuseSpan,
- LangfuseTrace,
- LevelEnum,
- UnitEnum,
- )
- from core.ops.utils import filter_none_values
- from extensions.ext_database import db
- from models.model import EndUser
- from models.workflow import WorkflowNodeExecution
- logger = logging.getLogger(__name__)
- class LangFuseDataTrace(BaseTraceInstance):
- def __init__(
- self,
- langfuse_config: LangfuseConfig,
- ):
- super().__init__(langfuse_config)
- self.langfuse_client = Langfuse(
- public_key=langfuse_config.public_key,
- secret_key=langfuse_config.secret_key,
- host=langfuse_config.host,
- )
- self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
- def trace(self, trace_info: BaseTraceInfo):
- if isinstance(trace_info, WorkflowTraceInfo):
- self.workflow_trace(trace_info)
- if isinstance(trace_info, MessageTraceInfo):
- self.message_trace(trace_info)
- if isinstance(trace_info, ModerationTraceInfo):
- self.moderation_trace(trace_info)
- if isinstance(trace_info, SuggestedQuestionTraceInfo):
- self.suggested_question_trace(trace_info)
- if isinstance(trace_info, DatasetRetrievalTraceInfo):
- self.dataset_retrieval_trace(trace_info)
- if isinstance(trace_info, ToolTraceInfo):
- self.tool_trace(trace_info)
- if isinstance(trace_info, GenerateNameTraceInfo):
- self.generate_name_trace(trace_info)
- def workflow_trace(self, trace_info: WorkflowTraceInfo):
- trace_id = trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id
- if trace_info.message_id:
- trace_id = trace_info.message_id
- name = f"message_{trace_info.message_id}"
- trace_data = LangfuseTrace(
- id=trace_info.message_id,
- user_id=trace_info.tenant_id,
- name=name,
- input=trace_info.workflow_run_inputs,
- output=trace_info.workflow_run_outputs,
- metadata=trace_info.metadata,
- session_id=trace_info.conversation_id,
- tags=["message", "workflow"],
- )
- self.add_trace(langfuse_trace_data=trace_data)
- workflow_span_data = LangfuseSpan(
- id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
- name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
- input=trace_info.workflow_run_inputs,
- output=trace_info.workflow_run_outputs,
- trace_id=trace_id,
- start_time=trace_info.start_time,
- end_time=trace_info.end_time,
- metadata=trace_info.metadata,
- level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
- status_message=trace_info.error if trace_info.error else "",
- )
- self.add_span(langfuse_span_data=workflow_span_data)
- else:
- trace_data = LangfuseTrace(
- id=trace_id,
- user_id=trace_info.tenant_id,
- name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
- input=trace_info.workflow_run_inputs,
- output=trace_info.workflow_run_outputs,
- metadata=trace_info.metadata,
- session_id=trace_info.conversation_id,
- tags=["workflow"],
- )
- self.add_trace(langfuse_trace_data=trace_data)
- # through workflow_run_id get all_nodes_execution
- workflow_nodes_executions = (
- db.session.query(WorkflowNodeExecution)
- .filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
- .order_by(WorkflowNodeExecution.index.desc())
- .all()
- )
- for node_execution in workflow_nodes_executions:
- node_execution_id = node_execution.id
- tenant_id = node_execution.tenant_id
- app_id = node_execution.app_id
- node_name = node_execution.title
- node_type = node_execution.node_type
- status = node_execution.status
- if node_type == "llm":
- inputs = json.loads(node_execution.process_data).get("prompts", {})
- else:
- inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
- outputs = (
- json.loads(node_execution.outputs) if node_execution.outputs else {}
- )
- created_at = node_execution.created_at if node_execution.created_at else datetime.now()
- elapsed_time = node_execution.elapsed_time
- finished_at = created_at + timedelta(seconds=elapsed_time)
- metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
- metadata.update(
- {
- "workflow_run_id": trace_info.workflow_run_id,
- "node_execution_id": node_execution_id,
- "tenant_id": tenant_id,
- "app_id": app_id,
- "node_name": node_name,
- "node_type": node_type,
- "status": status,
- }
- )
- # add span
- if trace_info.message_id:
- span_data = LangfuseSpan(
- id=node_execution_id,
- name=f"{node_name}_{node_execution_id}",
- input=inputs,
- output=outputs,
- trace_id=trace_id,
- start_time=created_at,
- end_time=finished_at,
- metadata=metadata,
- level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
- status_message=trace_info.error if trace_info.error else "",
- parent_observation_id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
- )
- else:
- span_data = LangfuseSpan(
- id=node_execution_id,
- name=f"{node_name}_{node_execution_id}",
- input=inputs,
- output=outputs,
- trace_id=trace_id,
- start_time=created_at,
- end_time=finished_at,
- metadata=metadata,
- level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
- status_message=trace_info.error if trace_info.error else "",
- )
- self.add_span(langfuse_span_data=span_data)
- process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
- if process_data and process_data.get("model_mode") == "chat":
- total_token = metadata.get("total_tokens", 0)
- # add generation
- generation_usage = GenerationUsage(
- totalTokens=total_token,
- )
- node_generation_data = LangfuseGeneration(
- name=f"generation_{node_execution_id}",
- trace_id=trace_id,
- parent_observation_id=node_execution_id,
- start_time=created_at,
- end_time=finished_at,
- input=inputs,
- output=outputs,
- metadata=metadata,
- level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
- status_message=trace_info.error if trace_info.error else "",
- usage=generation_usage,
- )
- self.add_generation(langfuse_generation_data=node_generation_data)
- def message_trace(
- self, trace_info: MessageTraceInfo, **kwargs
- ):
- # get message file data
- file_list = trace_info.file_list
- metadata = trace_info.metadata
- message_data = trace_info.message_data
- message_id = message_data.id
- user_id = message_data.from_account_id
- if message_data.from_end_user_id:
- end_user_data: EndUser = db.session.query(EndUser).filter(
- EndUser.id == message_data.from_end_user_id
- ).first()
- user_id = end_user_data.session_id
- trace_data = LangfuseTrace(
- id=message_id,
- user_id=user_id,
- name=f"message_{message_id}",
- input={
- "message": trace_info.inputs,
- "files": file_list,
- "message_tokens": trace_info.message_tokens,
- "answer_tokens": trace_info.answer_tokens,
- "total_tokens": trace_info.total_tokens,
- "error": trace_info.error,
- "provider_response_latency": message_data.provider_response_latency,
- "created_at": trace_info.start_time,
- },
- output=trace_info.outputs,
- metadata=metadata,
- session_id=message_data.conversation_id,
- tags=["message", str(trace_info.conversation_mode)],
- version=None,
- release=None,
- public=None,
- )
- self.add_trace(langfuse_trace_data=trace_data)
- # start add span
- generation_usage = GenerationUsage(
- totalTokens=trace_info.total_tokens,
- input=trace_info.message_tokens,
- output=trace_info.answer_tokens,
- total=trace_info.total_tokens,
- unit=UnitEnum.TOKENS,
- totalCost=message_data.total_price,
- )
- langfuse_generation_data = LangfuseGeneration(
- name=f"generation_{message_id}",
- trace_id=message_id,
- start_time=trace_info.start_time,
- end_time=trace_info.end_time,
- model=message_data.model_id,
- input=trace_info.inputs,
- output=message_data.answer,
- metadata=metadata,
- level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
- status_message=message_data.error if message_data.error else "",
- usage=generation_usage,
- )
- self.add_generation(langfuse_generation_data)
- def moderation_trace(self, trace_info: ModerationTraceInfo):
- span_data = LangfuseSpan(
- name="moderation",
- input=trace_info.inputs,
- output={
- "action": trace_info.action,
- "flagged": trace_info.flagged,
- "preset_response": trace_info.preset_response,
- "inputs": trace_info.inputs,
- },
- trace_id=trace_info.message_id,
- start_time=trace_info.start_time or trace_info.message_data.created_at,
- end_time=trace_info.end_time or trace_info.message_data.created_at,
- metadata=trace_info.metadata,
- )
- self.add_span(langfuse_span_data=span_data)
- def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
- message_data = trace_info.message_data
- generation_usage = GenerationUsage(
- totalTokens=len(str(trace_info.suggested_question)),
- input=len(trace_info.inputs),
- output=len(trace_info.suggested_question),
- total=len(trace_info.suggested_question),
- unit=UnitEnum.CHARACTERS,
- )
- generation_data = LangfuseGeneration(
- name="suggested_question",
- input=trace_info.inputs,
- output=str(trace_info.suggested_question),
- trace_id=trace_info.message_id,
- start_time=trace_info.start_time,
- end_time=trace_info.end_time,
- metadata=trace_info.metadata,
- level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
- status_message=message_data.error if message_data.error else "",
- usage=generation_usage,
- )
- self.add_generation(langfuse_generation_data=generation_data)
- def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
- dataset_retrieval_span_data = LangfuseSpan(
- name="dataset_retrieval",
- input=trace_info.inputs,
- output={"documents": trace_info.documents},
- trace_id=trace_info.message_id,
- start_time=trace_info.start_time or trace_info.message_data.created_at,
- end_time=trace_info.end_time or trace_info.message_data.updated_at,
- metadata=trace_info.metadata,
- )
- self.add_span(langfuse_span_data=dataset_retrieval_span_data)
- def tool_trace(self, trace_info: ToolTraceInfo):
- tool_span_data = LangfuseSpan(
- name=trace_info.tool_name,
- input=trace_info.tool_inputs,
- output=trace_info.tool_outputs,
- trace_id=trace_info.message_id,
- start_time=trace_info.start_time,
- end_time=trace_info.end_time,
- metadata=trace_info.metadata,
- level=LevelEnum.DEFAULT if trace_info.error == "" or trace_info.error is None else LevelEnum.ERROR,
- status_message=trace_info.error,
- )
- self.add_span(langfuse_span_data=tool_span_data)
- def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
- name_generation_trace_data = LangfuseTrace(
- name="generate_name",
- input=trace_info.inputs,
- output=trace_info.outputs,
- user_id=trace_info.tenant_id,
- metadata=trace_info.metadata,
- session_id=trace_info.conversation_id,
- )
- self.add_trace(langfuse_trace_data=name_generation_trace_data)
- name_generation_span_data = LangfuseSpan(
- name="generate_name",
- input=trace_info.inputs,
- output=trace_info.outputs,
- trace_id=trace_info.conversation_id,
- start_time=trace_info.start_time,
- end_time=trace_info.end_time,
- metadata=trace_info.metadata,
- )
- self.add_span(langfuse_span_data=name_generation_span_data)
- def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):
- format_trace_data = (
- filter_none_values(langfuse_trace_data.model_dump()) if langfuse_trace_data else {}
- )
- try:
- self.langfuse_client.trace(**format_trace_data)
- logger.debug("LangFuse Trace created successfully")
- except Exception as e:
- raise ValueError(f"LangFuse Failed to create trace: {str(e)}")
- def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
- format_span_data = (
- filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
- )
- try:
- self.langfuse_client.span(**format_span_data)
- logger.debug("LangFuse Span created successfully")
- except Exception as e:
- raise ValueError(f"LangFuse Failed to create span: {str(e)}")
- def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):
- format_span_data = (
- filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
- )
- span.end(**format_span_data)
- def add_generation(
- self, langfuse_generation_data: Optional[LangfuseGeneration] = None
- ):
- format_generation_data = (
- filter_none_values(langfuse_generation_data.model_dump())
- if langfuse_generation_data
- else {}
- )
- try:
- self.langfuse_client.generation(**format_generation_data)
- logger.debug("LangFuse Generation created successfully")
- except Exception as e:
- raise ValueError(f"LangFuse Failed to create generation: {str(e)}")
- def update_generation(
- self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None
- ):
- format_generation_data = (
- filter_none_values(langfuse_generation_data.model_dump())
- if langfuse_generation_data
- else {}
- )
- generation.end(**format_generation_data)
- def api_check(self):
- try:
- return self.langfuse_client.auth_check()
- except Exception as e:
- logger.debug(f"LangFuse API check failed: {str(e)}")
- raise ValueError(f"LangFuse API check failed: {str(e)}")
|