ops_trace_manager.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from enum import Enum
  9. from typing import Any, Optional, Union
  10. from uuid import UUID
  11. from flask import current_app
  12. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  13. from core.ops.entities.config_entity import (
  14. LangfuseConfig,
  15. LangSmithConfig,
  16. TracingProviderEnum,
  17. )
  18. from core.ops.entities.trace_entity import (
  19. DatasetRetrievalTraceInfo,
  20. GenerateNameTraceInfo,
  21. MessageTraceInfo,
  22. ModerationTraceInfo,
  23. SuggestedQuestionTraceInfo,
  24. ToolTraceInfo,
  25. WorkflowTraceInfo,
  26. )
  27. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  28. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  29. from core.ops.utils import get_message_data
  30. from extensions.ext_database import db
  31. from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
  32. from models.workflow import WorkflowAppLog, WorkflowRun
  33. from tasks.ops_trace_task import process_trace_tasks
  34. provider_config_map = {
  35. TracingProviderEnum.LANGFUSE.value: {
  36. 'config_class': LangfuseConfig,
  37. 'secret_keys': ['public_key', 'secret_key'],
  38. 'other_keys': ['host'],
  39. 'trace_instance': LangFuseDataTrace
  40. },
  41. TracingProviderEnum.LANGSMITH.value: {
  42. 'config_class': LangSmithConfig,
  43. 'secret_keys': ['api_key'],
  44. 'other_keys': ['project', 'endpoint'],
  45. 'trace_instance': LangSmithDataTrace
  46. }
  47. }
  48. class OpsTraceManager:
  49. @classmethod
  50. def encrypt_tracing_config(
  51. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  52. ):
  53. """
  54. Encrypt tracing config.
  55. :param tenant_id: tenant id
  56. :param tracing_provider: tracing provider
  57. :param tracing_config: tracing config dictionary to be encrypted
  58. :param current_trace_config: current tracing configuration for keeping existing values
  59. :return: encrypted tracing configuration
  60. """
  61. # Get the configuration class and the keys that require encryption
  62. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  63. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  64. new_config = {}
  65. # Encrypt necessary keys
  66. for key in secret_keys:
  67. if key in tracing_config:
  68. if '*' in tracing_config[key]:
  69. # If the key contains '*', retain the original value from the current config
  70. new_config[key] = current_trace_config.get(key, tracing_config[key])
  71. else:
  72. # Otherwise, encrypt the key
  73. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  74. for key in other_keys:
  75. new_config[key] = tracing_config.get(key, "")
  76. # Create a new instance of the config class with the new configuration
  77. encrypted_config = config_class(**new_config)
  78. return encrypted_config.model_dump()
  79. @classmethod
  80. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  81. """
  82. Decrypt tracing config
  83. :param tenant_id: tenant id
  84. :param tracing_provider: tracing provider
  85. :param tracing_config: tracing config
  86. :return:
  87. """
  88. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  89. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  90. new_config = {}
  91. for key in secret_keys:
  92. if key in tracing_config:
  93. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  94. for key in other_keys:
  95. new_config[key] = tracing_config.get(key, "")
  96. return config_class(**new_config).model_dump()
  97. @classmethod
  98. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  99. """
  100. Decrypt tracing config
  101. :param tracing_provider: tracing provider
  102. :param decrypt_tracing_config: tracing config
  103. :return:
  104. """
  105. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  106. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  107. new_config = {}
  108. for key in secret_keys:
  109. if key in decrypt_tracing_config:
  110. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  111. for key in other_keys:
  112. new_config[key] = decrypt_tracing_config.get(key, "")
  113. return config_class(**new_config).model_dump()
  114. @classmethod
  115. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  116. """
  117. Get decrypted tracing config
  118. :param app_id: app id
  119. :param tracing_provider: tracing provider
  120. :return:
  121. """
  122. trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter(
  123. TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider
  124. ).first()
  125. if not trace_config_data:
  126. return None
  127. # decrypt_token
  128. tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id
  129. decrypt_tracing_config = cls.decrypt_tracing_config(
  130. tenant_id, tracing_provider, trace_config_data.tracing_config
  131. )
  132. return decrypt_tracing_config
  133. @classmethod
  134. def get_ops_trace_instance(
  135. cls,
  136. app_id: Optional[Union[UUID, str]] = None,
  137. message_id: Optional[str] = None,
  138. conversation_id: Optional[str] = None
  139. ):
  140. """
  141. Get ops trace through model config
  142. :param app_id: app_id
  143. :param message_id: message_id
  144. :param conversation_id: conversation_id
  145. :return:
  146. """
  147. if conversation_id is not None:
  148. conversation_data: Conversation = db.session.query(Conversation).filter(
  149. Conversation.id == conversation_id
  150. ).first()
  151. if conversation_data:
  152. app_id = conversation_data.app_id
  153. if message_id is not None:
  154. record: Message = db.session.query(Message).filter(Message.id == message_id).first()
  155. app_id = record.app_id
  156. if isinstance(app_id, UUID):
  157. app_id = str(app_id)
  158. if app_id is None:
  159. return None
  160. app: App = db.session.query(App).filter(
  161. App.id == app_id
  162. ).first()
  163. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  164. if app_ops_trace_config is not None:
  165. tracing_provider = app_ops_trace_config.get('tracing_provider')
  166. else:
  167. return None
  168. # decrypt_token
  169. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  170. if app_ops_trace_config.get('enabled'):
  171. trace_instance, config_class = provider_config_map[tracing_provider]['trace_instance'], \
  172. provider_config_map[tracing_provider]['config_class']
  173. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  174. return tracing_instance
  175. return None
  176. @classmethod
  177. def get_app_config_through_message_id(cls, message_id: str):
  178. app_model_config = None
  179. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  180. conversation_id = message_data.conversation_id
  181. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  182. if conversation_data.app_model_config_id:
  183. app_model_config = db.session.query(AppModelConfig).filter(
  184. AppModelConfig.id == conversation_data.app_model_config_id
  185. ).first()
  186. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  187. app_model_config = conversation_data.override_model_configs
  188. return app_model_config
  189. @classmethod
  190. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  191. """
  192. Update app tracing config
  193. :param app_id: app id
  194. :param enabled: enabled
  195. :param tracing_provider: tracing provider
  196. :return:
  197. """
  198. # auth check
  199. if tracing_provider not in provider_config_map.keys() and tracing_provider is not None:
  200. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  201. app_config: App = db.session.query(App).filter(App.id == app_id).first()
  202. app_config.tracing = json.dumps(
  203. {
  204. "enabled": enabled,
  205. "tracing_provider": tracing_provider,
  206. }
  207. )
  208. db.session.commit()
  209. @classmethod
  210. def get_app_tracing_config(cls, app_id: str):
  211. """
  212. Get app tracing config
  213. :param app_id: app id
  214. :return:
  215. """
  216. app: App = db.session.query(App).filter(App.id == app_id).first()
  217. if not app.tracing:
  218. return {
  219. "enabled": False,
  220. "tracing_provider": None
  221. }
  222. app_trace_config = json.loads(app.tracing)
  223. return app_trace_config
  224. @staticmethod
  225. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  226. """
  227. Check trace config is effective
  228. :param tracing_config: tracing config
  229. :param tracing_provider: tracing provider
  230. :return:
  231. """
  232. config_type, trace_instance = provider_config_map[tracing_provider]['config_class'], \
  233. provider_config_map[tracing_provider]['trace_instance']
  234. tracing_config = config_type(**tracing_config)
  235. return trace_instance(tracing_config).api_check()
  236. class TraceTaskName(str, Enum):
  237. CONVERSATION_TRACE = 'conversation_trace'
  238. WORKFLOW_TRACE = 'workflow_trace'
  239. MESSAGE_TRACE = 'message_trace'
  240. MODERATION_TRACE = 'moderation_trace'
  241. SUGGESTED_QUESTION_TRACE = 'suggested_question_trace'
  242. DATASET_RETRIEVAL_TRACE = 'dataset_retrieval_trace'
  243. TOOL_TRACE = 'tool_trace'
  244. GENERATE_NAME_TRACE = 'generate_name_trace'
  245. class TraceTask:
  246. def __init__(
  247. self,
  248. trace_type: Any,
  249. message_id: Optional[str] = None,
  250. workflow_run: Optional[WorkflowRun] = None,
  251. conversation_id: Optional[str] = None,
  252. timer: Optional[Any] = None,
  253. **kwargs
  254. ):
  255. self.trace_type = trace_type
  256. self.message_id = message_id
  257. self.workflow_run = workflow_run
  258. self.conversation_id = conversation_id
  259. self.timer = timer
  260. self.kwargs = kwargs
  261. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  262. def execute(self):
  263. method_name, trace_info = self.preprocess()
  264. return trace_info
  265. def preprocess(self):
  266. if self.trace_type == TraceTaskName.CONVERSATION_TRACE:
  267. return TraceTaskName.CONVERSATION_TRACE, self.conversation_trace(**self.kwargs)
  268. if self.trace_type == TraceTaskName.WORKFLOW_TRACE:
  269. return TraceTaskName.WORKFLOW_TRACE, self.workflow_trace(self.workflow_run, self.conversation_id)
  270. elif self.trace_type == TraceTaskName.MESSAGE_TRACE:
  271. return TraceTaskName.MESSAGE_TRACE, self.message_trace(self.message_id)
  272. elif self.trace_type == TraceTaskName.MODERATION_TRACE:
  273. return TraceTaskName.MODERATION_TRACE, self.moderation_trace(self.message_id, self.timer, **self.kwargs)
  274. elif self.trace_type == TraceTaskName.SUGGESTED_QUESTION_TRACE:
  275. return TraceTaskName.SUGGESTED_QUESTION_TRACE, self.suggested_question_trace(
  276. self.message_id, self.timer, **self.kwargs
  277. )
  278. elif self.trace_type == TraceTaskName.DATASET_RETRIEVAL_TRACE:
  279. return TraceTaskName.DATASET_RETRIEVAL_TRACE, self.dataset_retrieval_trace(
  280. self.message_id, self.timer, **self.kwargs
  281. )
  282. elif self.trace_type == TraceTaskName.TOOL_TRACE:
  283. return TraceTaskName.TOOL_TRACE, self.tool_trace(self.message_id, self.timer, **self.kwargs)
  284. elif self.trace_type == TraceTaskName.GENERATE_NAME_TRACE:
  285. return TraceTaskName.GENERATE_NAME_TRACE, self.generate_name_trace(
  286. self.conversation_id, self.timer, **self.kwargs
  287. )
  288. else:
  289. return '', {}
  290. # process methods for different trace types
  291. def conversation_trace(self, **kwargs):
  292. return kwargs
  293. def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
  294. workflow_id = workflow_run.workflow_id
  295. tenant_id = workflow_run.tenant_id
  296. workflow_run_id = workflow_run.id
  297. workflow_run_elapsed_time = workflow_run.elapsed_time
  298. workflow_run_status = workflow_run.status
  299. workflow_run_inputs = (
  300. json.loads(workflow_run.inputs) if workflow_run.inputs else {}
  301. )
  302. workflow_run_outputs = (
  303. json.loads(workflow_run.outputs) if workflow_run.outputs else {}
  304. )
  305. workflow_run_version = workflow_run.version
  306. error = workflow_run.error if workflow_run.error else ""
  307. total_tokens = workflow_run.total_tokens
  308. file_list = workflow_run_inputs.get("sys.file") if workflow_run_inputs.get("sys.file") else []
  309. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  310. # get workflow_app_log_id
  311. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(workflow_run_id=workflow_run.id).first()
  312. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  313. # get message_id
  314. message_data = db.session.query(Message.id).filter_by(workflow_run_id=workflow_run_id).first()
  315. message_id = str(message_data.id) if message_data else None
  316. metadata = {
  317. "workflow_id": workflow_id,
  318. "conversation_id": conversation_id,
  319. "workflow_run_id": workflow_run_id,
  320. "tenant_id": tenant_id,
  321. "elapsed_time": workflow_run_elapsed_time,
  322. "status": workflow_run_status,
  323. "version": workflow_run_version,
  324. "total_tokens": total_tokens,
  325. "file_list": file_list,
  326. "triggered_form": workflow_run.triggered_from,
  327. }
  328. workflow_trace_info = WorkflowTraceInfo(
  329. workflow_data=workflow_run.to_dict(),
  330. conversation_id=conversation_id,
  331. workflow_id=workflow_id,
  332. tenant_id=tenant_id,
  333. workflow_run_id=workflow_run_id,
  334. workflow_run_elapsed_time=workflow_run_elapsed_time,
  335. workflow_run_status=workflow_run_status,
  336. workflow_run_inputs=workflow_run_inputs,
  337. workflow_run_outputs=workflow_run_outputs,
  338. workflow_run_version=workflow_run_version,
  339. error=error,
  340. total_tokens=total_tokens,
  341. file_list=file_list,
  342. query=query,
  343. metadata=metadata,
  344. workflow_app_log_id=workflow_app_log_id,
  345. message_id=message_id,
  346. start_time=workflow_run.created_at,
  347. end_time=workflow_run.finished_at,
  348. )
  349. return workflow_trace_info
  350. def message_trace(self, message_id):
  351. message_data = get_message_data(message_id)
  352. if not message_data:
  353. return {}
  354. conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
  355. conversation_mode = conversation_mode[0]
  356. created_at = message_data.created_at
  357. inputs = message_data.message
  358. # get message file data
  359. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  360. file_list = []
  361. if message_file_data and message_file_data.url is not None:
  362. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  363. file_list.append(file_url)
  364. metadata = {
  365. "conversation_id": message_data.conversation_id,
  366. "ls_provider": message_data.model_provider,
  367. "ls_model_name": message_data.model_id,
  368. "status": message_data.status,
  369. "from_end_user_id": message_data.from_account_id,
  370. "from_account_id": message_data.from_account_id,
  371. "agent_based": message_data.agent_based,
  372. "workflow_run_id": message_data.workflow_run_id,
  373. "from_source": message_data.from_source,
  374. "message_id": message_id,
  375. }
  376. message_tokens = message_data.message_tokens
  377. message_trace_info = MessageTraceInfo(
  378. message_id=message_id,
  379. message_data=message_data.to_dict(),
  380. conversation_model=conversation_mode,
  381. message_tokens=message_tokens,
  382. answer_tokens=message_data.answer_tokens,
  383. total_tokens=message_tokens + message_data.answer_tokens,
  384. error=message_data.error if message_data.error else "",
  385. inputs=inputs,
  386. outputs=message_data.answer,
  387. file_list=file_list,
  388. start_time=created_at,
  389. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  390. metadata=metadata,
  391. message_file_data=message_file_data,
  392. conversation_mode=conversation_mode,
  393. )
  394. return message_trace_info
  395. def moderation_trace(self, message_id, timer, **kwargs):
  396. moderation_result = kwargs.get("moderation_result")
  397. inputs = kwargs.get("inputs")
  398. message_data = get_message_data(message_id)
  399. if not message_data:
  400. return {}
  401. metadata = {
  402. "message_id": message_id,
  403. "action": moderation_result.action,
  404. "preset_response": moderation_result.preset_response,
  405. "query": moderation_result.query,
  406. }
  407. # get workflow_app_log_id
  408. workflow_app_log_id = None
  409. if message_data.workflow_run_id:
  410. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
  411. workflow_run_id=message_data.workflow_run_id
  412. ).first()
  413. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  414. moderation_trace_info = ModerationTraceInfo(
  415. message_id=workflow_app_log_id if workflow_app_log_id else message_id,
  416. inputs=inputs,
  417. message_data=message_data.to_dict(),
  418. flagged=moderation_result.flagged,
  419. action=moderation_result.action,
  420. preset_response=moderation_result.preset_response,
  421. query=moderation_result.query,
  422. start_time=timer.get("start"),
  423. end_time=timer.get("end"),
  424. metadata=metadata,
  425. )
  426. return moderation_trace_info
  427. def suggested_question_trace(self, message_id, timer, **kwargs):
  428. suggested_question = kwargs.get("suggested_question")
  429. message_data = get_message_data(message_id)
  430. if not message_data:
  431. return {}
  432. metadata = {
  433. "message_id": message_id,
  434. "ls_provider": message_data.model_provider,
  435. "ls_model_name": message_data.model_id,
  436. "status": message_data.status,
  437. "from_end_user_id": message_data.from_account_id,
  438. "from_account_id": message_data.from_account_id,
  439. "agent_based": message_data.agent_based,
  440. "workflow_run_id": message_data.workflow_run_id,
  441. "from_source": message_data.from_source,
  442. }
  443. # get workflow_app_log_id
  444. workflow_app_log_id = None
  445. if message_data.workflow_run_id:
  446. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
  447. workflow_run_id=message_data.workflow_run_id
  448. ).first()
  449. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  450. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  451. message_id=workflow_app_log_id if workflow_app_log_id else message_id,
  452. message_data=message_data.to_dict(),
  453. inputs=message_data.message,
  454. outputs=message_data.answer,
  455. start_time=timer.get("start"),
  456. end_time=timer.get("end"),
  457. metadata=metadata,
  458. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  459. status=message_data.status,
  460. error=message_data.error,
  461. from_account_id=message_data.from_account_id,
  462. agent_based=message_data.agent_based,
  463. from_source=message_data.from_source,
  464. model_provider=message_data.model_provider,
  465. model_id=message_data.model_id,
  466. suggested_question=suggested_question,
  467. level=message_data.status,
  468. status_message=message_data.error,
  469. )
  470. return suggested_question_trace_info
  471. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  472. documents = kwargs.get("documents")
  473. message_data = get_message_data(message_id)
  474. if not message_data:
  475. return {}
  476. metadata = {
  477. "message_id": message_id,
  478. "ls_provider": message_data.model_provider,
  479. "ls_model_name": message_data.model_id,
  480. "status": message_data.status,
  481. "from_end_user_id": message_data.from_account_id,
  482. "from_account_id": message_data.from_account_id,
  483. "agent_based": message_data.agent_based,
  484. "workflow_run_id": message_data.workflow_run_id,
  485. "from_source": message_data.from_source,
  486. }
  487. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  488. message_id=message_id,
  489. inputs=message_data.query if message_data.query else message_data.inputs,
  490. documents=[doc.model_dump() for doc in documents],
  491. start_time=timer.get("start"),
  492. end_time=timer.get("end"),
  493. metadata=metadata,
  494. message_data=message_data.to_dict(),
  495. )
  496. return dataset_retrieval_trace_info
  497. def tool_trace(self, message_id, timer, **kwargs):
  498. tool_name = kwargs.get('tool_name')
  499. tool_inputs = kwargs.get('tool_inputs')
  500. tool_outputs = kwargs.get('tool_outputs')
  501. message_data = get_message_data(message_id)
  502. if not message_data:
  503. return {}
  504. tool_config = {}
  505. time_cost = 0
  506. error = None
  507. tool_parameters = {}
  508. created_time = message_data.created_at
  509. end_time = message_data.updated_at
  510. agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts
  511. for agent_thought in agent_thoughts:
  512. if tool_name in agent_thought.tools:
  513. created_time = agent_thought.created_at
  514. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  515. tool_config = tool_meta_data.get('tool_config', {})
  516. time_cost = tool_meta_data.get('time_cost', 0)
  517. end_time = created_time + timedelta(seconds=time_cost)
  518. error = tool_meta_data.get('error', "")
  519. tool_parameters = tool_meta_data.get('tool_parameters', {})
  520. metadata = {
  521. "message_id": message_id,
  522. "tool_name": tool_name,
  523. "tool_inputs": tool_inputs,
  524. "tool_outputs": tool_outputs,
  525. "tool_config": tool_config,
  526. "time_cost": time_cost,
  527. "error": error,
  528. "tool_parameters": tool_parameters,
  529. }
  530. file_url = ""
  531. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  532. if message_file_data:
  533. message_file_id = message_file_data.id if message_file_data else None
  534. type = message_file_data.type
  535. created_by_role = message_file_data.created_by_role
  536. created_user_id = message_file_data.created_by
  537. file_url = f"{self.file_base_url}/{message_file_data.url}"
  538. metadata.update(
  539. {
  540. "message_file_id": message_file_id,
  541. "created_by_role": created_by_role,
  542. "created_user_id": created_user_id,
  543. "type": type,
  544. }
  545. )
  546. tool_trace_info = ToolTraceInfo(
  547. message_id=message_id,
  548. message_data=message_data.to_dict(),
  549. tool_name=tool_name,
  550. start_time=timer.get("start") if timer else created_time,
  551. end_time=timer.get("end") if timer else end_time,
  552. tool_inputs=tool_inputs,
  553. tool_outputs=tool_outputs,
  554. metadata=metadata,
  555. message_file_data=message_file_data,
  556. error=error,
  557. inputs=message_data.message,
  558. outputs=message_data.answer,
  559. tool_config=tool_config,
  560. time_cost=time_cost,
  561. tool_parameters=tool_parameters,
  562. file_url=file_url,
  563. )
  564. return tool_trace_info
  565. def generate_name_trace(self, conversation_id, timer, **kwargs):
  566. generate_conversation_name = kwargs.get("generate_conversation_name")
  567. inputs = kwargs.get("inputs")
  568. tenant_id = kwargs.get("tenant_id")
  569. start_time = timer.get("start")
  570. end_time = timer.get("end")
  571. metadata = {
  572. "conversation_id": conversation_id,
  573. "tenant_id": tenant_id,
  574. }
  575. generate_name_trace_info = GenerateNameTraceInfo(
  576. conversation_id=conversation_id,
  577. inputs=inputs,
  578. outputs=generate_conversation_name,
  579. start_time=start_time,
  580. end_time=end_time,
  581. metadata=metadata,
  582. tenant_id=tenant_id,
  583. )
  584. return generate_name_trace_info
  585. trace_manager_timer = None
  586. trace_manager_queue = queue.Queue()
  587. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 1))
  588. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  589. class TraceQueueManager:
  590. def __init__(self, app_id=None, conversation_id=None, message_id=None):
  591. global trace_manager_timer
  592. self.app_id = app_id
  593. self.conversation_id = conversation_id
  594. self.message_id = message_id
  595. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
  596. self.flask_app = current_app._get_current_object()
  597. if trace_manager_timer is None:
  598. self.start_timer()
  599. def add_trace_task(self, trace_task: TraceTask):
  600. global trace_manager_timer
  601. global trace_manager_queue
  602. try:
  603. if self.trace_instance:
  604. trace_manager_queue.put(trace_task)
  605. except Exception as e:
  606. logging.debug(f"Error adding trace task: {e}")
  607. finally:
  608. self.start_timer()
  609. def collect_tasks(self):
  610. global trace_manager_queue
  611. tasks = []
  612. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  613. task = trace_manager_queue.get_nowait()
  614. tasks.append(task)
  615. trace_manager_queue.task_done()
  616. return tasks
  617. def run(self):
  618. try:
  619. tasks = self.collect_tasks()
  620. if tasks:
  621. self.send_to_celery(tasks)
  622. except Exception as e:
  623. logging.debug(f"Error processing trace tasks: {e}")
  624. def start_timer(self):
  625. global trace_manager_timer
  626. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  627. trace_manager_timer = threading.Timer(
  628. trace_manager_interval, self.run
  629. )
  630. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  631. trace_manager_timer.daemon = False
  632. trace_manager_timer.start()
  633. def send_to_celery(self, tasks: list[TraceTask]):
  634. with self.flask_app.app_context():
  635. for task in tasks:
  636. trace_info = task.execute()
  637. task_data = {
  638. "app_id": self.app_id,
  639. "conversation_id": self.conversation_id,
  640. "message_id": self.message_id,
  641. "trace_info_type": type(trace_info).__name__,
  642. "trace_info": trace_info.model_dump() if trace_info else {},
  643. }
  644. process_trace_tasks.delay(task_data)