ops_trace_manager.py 28 KB


  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from typing import Any, Optional, Union
  9. from uuid import UUID
  10. from flask import current_app
  11. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  12. from core.ops.entities.config_entity import (
  13. LangfuseConfig,
  14. LangSmithConfig,
  15. TracingProviderEnum,
  16. )
  17. from core.ops.entities.trace_entity import (
  18. DatasetRetrievalTraceInfo,
  19. GenerateNameTraceInfo,
  20. MessageTraceInfo,
  21. ModerationTraceInfo,
  22. SuggestedQuestionTraceInfo,
  23. ToolTraceInfo,
  24. TraceTaskName,
  25. WorkflowTraceInfo,
  26. )
  27. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  28. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  29. from core.ops.utils import get_message_data
  30. from extensions.ext_database import db
  31. from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
  32. from models.workflow import WorkflowAppLog, WorkflowRun
  33. from tasks.ops_trace_task import process_trace_tasks
  34. provider_config_map = {
  35. TracingProviderEnum.LANGFUSE.value: {
  36. "config_class": LangfuseConfig,
  37. "secret_keys": ["public_key", "secret_key"],
  38. "other_keys": ["host", "project_key"],
  39. "trace_instance": LangFuseDataTrace,
  40. },
  41. TracingProviderEnum.LANGSMITH.value: {
  42. "config_class": LangSmithConfig,
  43. "secret_keys": ["api_key"],
  44. "other_keys": ["project", "endpoint"],
  45. "trace_instance": LangSmithDataTrace,
  46. },
  47. }
  48. class OpsTraceManager:
  49. @classmethod
  50. def encrypt_tracing_config(
  51. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  52. ):
  53. """
  54. Encrypt tracing config.
  55. :param tenant_id: tenant id
  56. :param tracing_provider: tracing provider
  57. :param tracing_config: tracing config dictionary to be encrypted
  58. :param current_trace_config: current tracing configuration for keeping existing values
  59. :return: encrypted tracing configuration
  60. """
  61. # Get the configuration class and the keys that require encryption
  62. config_class, secret_keys, other_keys = (
  63. provider_config_map[tracing_provider]["config_class"],
  64. provider_config_map[tracing_provider]["secret_keys"],
  65. provider_config_map[tracing_provider]["other_keys"],
  66. )
  67. new_config = {}
  68. # Encrypt necessary keys
  69. for key in secret_keys:
  70. if key in tracing_config:
  71. if "*" in tracing_config[key]:
  72. # If the key contains '*', retain the original value from the current config
  73. new_config[key] = current_trace_config.get(key, tracing_config[key])
  74. else:
  75. # Otherwise, encrypt the key
  76. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  77. for key in other_keys:
  78. new_config[key] = tracing_config.get(key, "")
  79. # Create a new instance of the config class with the new configuration
  80. encrypted_config = config_class(**new_config)
  81. return encrypted_config.model_dump()
  82. @classmethod
  83. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  84. """
  85. Decrypt tracing config
  86. :param tenant_id: tenant id
  87. :param tracing_provider: tracing provider
  88. :param tracing_config: tracing config
  89. :return:
  90. """
  91. config_class, secret_keys, other_keys = (
  92. provider_config_map[tracing_provider]["config_class"],
  93. provider_config_map[tracing_provider]["secret_keys"],
  94. provider_config_map[tracing_provider]["other_keys"],
  95. )
  96. new_config = {}
  97. for key in secret_keys:
  98. if key in tracing_config:
  99. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  100. for key in other_keys:
  101. new_config[key] = tracing_config.get(key, "")
  102. return config_class(**new_config).model_dump()
  103. @classmethod
  104. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  105. """
  106. Decrypt tracing config
  107. :param tracing_provider: tracing provider
  108. :param decrypt_tracing_config: tracing config
  109. :return:
  110. """
  111. config_class, secret_keys, other_keys = (
  112. provider_config_map[tracing_provider]["config_class"],
  113. provider_config_map[tracing_provider]["secret_keys"],
  114. provider_config_map[tracing_provider]["other_keys"],
  115. )
  116. new_config = {}
  117. for key in secret_keys:
  118. if key in decrypt_tracing_config:
  119. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  120. for key in other_keys:
  121. new_config[key] = decrypt_tracing_config.get(key, "")
  122. return config_class(**new_config).model_dump()
  123. @classmethod
  124. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  125. """
  126. Get decrypted tracing config
  127. :param app_id: app id
  128. :param tracing_provider: tracing provider
  129. :return:
  130. """
  131. trace_config_data: TraceAppConfig = (
  132. db.session.query(TraceAppConfig)
  133. .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
  134. .first()
  135. )
  136. if not trace_config_data:
  137. return None
  138. # decrypt_token
  139. tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id
  140. decrypt_tracing_config = cls.decrypt_tracing_config(
  141. tenant_id, tracing_provider, trace_config_data.tracing_config
  142. )
  143. return decrypt_tracing_config
  144. @classmethod
  145. def get_ops_trace_instance(
  146. cls,
  147. app_id: Optional[Union[UUID, str]] = None,
  148. ):
  149. """
  150. Get ops trace through model config
  151. :param app_id: app_id
  152. :return:
  153. """
  154. if isinstance(app_id, UUID):
  155. app_id = str(app_id)
  156. if app_id is None:
  157. return None
  158. app: App = db.session.query(App).filter(App.id == app_id).first()
  159. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  160. if app_ops_trace_config is not None:
  161. tracing_provider = app_ops_trace_config.get("tracing_provider")
  162. else:
  163. return None
  164. # decrypt_token
  165. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  166. if app_ops_trace_config.get("enabled"):
  167. trace_instance, config_class = (
  168. provider_config_map[tracing_provider]["trace_instance"],
  169. provider_config_map[tracing_provider]["config_class"],
  170. )
  171. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  172. return tracing_instance
  173. return None
  174. @classmethod
  175. def get_app_config_through_message_id(cls, message_id: str):
  176. app_model_config = None
  177. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  178. conversation_id = message_data.conversation_id
  179. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  180. if conversation_data.app_model_config_id:
  181. app_model_config = (
  182. db.session.query(AppModelConfig)
  183. .filter(AppModelConfig.id == conversation_data.app_model_config_id)
  184. .first()
  185. )
  186. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  187. app_model_config = conversation_data.override_model_configs
  188. return app_model_config
  189. @classmethod
  190. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  191. """
  192. Update app tracing config
  193. :param app_id: app id
  194. :param enabled: enabled
  195. :param tracing_provider: tracing provider
  196. :return:
  197. """
  198. # auth check
  199. if tracing_provider not in provider_config_map and tracing_provider is not None:
  200. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  201. app_config: App = db.session.query(App).filter(App.id == app_id).first()
  202. app_config.tracing = json.dumps(
  203. {
  204. "enabled": enabled,
  205. "tracing_provider": tracing_provider,
  206. }
  207. )
  208. db.session.commit()
  209. @classmethod
  210. def get_app_tracing_config(cls, app_id: str):
  211. """
  212. Get app tracing config
  213. :param app_id: app id
  214. :return:
  215. """
  216. app: App = db.session.query(App).filter(App.id == app_id).first()
  217. if not app.tracing:
  218. return {"enabled": False, "tracing_provider": None}
  219. app_trace_config = json.loads(app.tracing)
  220. return app_trace_config
  221. @staticmethod
  222. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  223. """
  224. Check trace config is effective
  225. :param tracing_config: tracing config
  226. :param tracing_provider: tracing provider
  227. :return:
  228. """
  229. config_type, trace_instance = (
  230. provider_config_map[tracing_provider]["config_class"],
  231. provider_config_map[tracing_provider]["trace_instance"],
  232. )
  233. tracing_config = config_type(**tracing_config)
  234. return trace_instance(tracing_config).api_check()
  235. @staticmethod
  236. def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
  237. """
  238. get trace config is project key
  239. :param tracing_config: tracing config
  240. :param tracing_provider: tracing provider
  241. :return:
  242. """
  243. config_type, trace_instance = (
  244. provider_config_map[tracing_provider]["config_class"],
  245. provider_config_map[tracing_provider]["trace_instance"],
  246. )
  247. tracing_config = config_type(**tracing_config)
  248. return trace_instance(tracing_config).get_project_key()
  249. @staticmethod
  250. def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
  251. """
  252. get trace config is project key
  253. :param tracing_config: tracing config
  254. :param tracing_provider: tracing provider
  255. :return:
  256. """
  257. config_type, trace_instance = (
  258. provider_config_map[tracing_provider]["config_class"],
  259. provider_config_map[tracing_provider]["trace_instance"],
  260. )
  261. tracing_config = config_type(**tracing_config)
  262. return trace_instance(tracing_config).get_project_url()
  263. class TraceTask:
  264. def __init__(
  265. self,
  266. trace_type: Any,
  267. message_id: Optional[str] = None,
  268. workflow_run: Optional[WorkflowRun] = None,
  269. conversation_id: Optional[str] = None,
  270. user_id: Optional[str] = None,
  271. timer: Optional[Any] = None,
  272. **kwargs,
  273. ):
  274. self.trace_type = trace_type
  275. self.message_id = message_id
  276. self.workflow_run = workflow_run
  277. self.conversation_id = conversation_id
  278. self.user_id = user_id
  279. self.timer = timer
  280. self.kwargs = kwargs
  281. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  282. self.app_id = None
  283. def execute(self):
  284. return self.preprocess()
  285. def preprocess(self):
  286. preprocess_map = {
  287. TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
  288. TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
  289. self.workflow_run, self.conversation_id, self.user_id
  290. ),
  291. TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(self.message_id),
  292. TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(self.message_id, self.timer, **self.kwargs),
  293. TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
  294. self.message_id, self.timer, **self.kwargs
  295. ),
  296. TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
  297. self.message_id, self.timer, **self.kwargs
  298. ),
  299. TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(self.message_id, self.timer, **self.kwargs),
  300. TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
  301. self.conversation_id, self.timer, **self.kwargs
  302. ),
  303. }
  304. return preprocess_map.get(self.trace_type, lambda: None)()
  305. # process methods for different trace types
  306. def conversation_trace(self, **kwargs):
  307. return kwargs
  308. def workflow_trace(self, workflow_run: WorkflowRun, conversation_id, user_id):
  309. workflow_id = workflow_run.workflow_id
  310. tenant_id = workflow_run.tenant_id
  311. workflow_run_id = workflow_run.id
  312. workflow_run_elapsed_time = workflow_run.elapsed_time
  313. workflow_run_status = workflow_run.status
  314. workflow_run_inputs = json.loads(workflow_run.inputs) if workflow_run.inputs else {}
  315. workflow_run_outputs = json.loads(workflow_run.outputs) if workflow_run.outputs else {}
  316. workflow_run_version = workflow_run.version
  317. error = workflow_run.error or ""
  318. total_tokens = workflow_run.total_tokens
  319. file_list = workflow_run_inputs.get("sys.file") or []
  320. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  321. # get workflow_app_log_id
  322. workflow_app_log_data = (
  323. db.session.query(WorkflowAppLog)
  324. .filter_by(tenant_id=tenant_id, app_id=workflow_run.app_id, workflow_run_id=workflow_run.id)
  325. .first()
  326. )
  327. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  328. # get message_id
  329. message_data = (
  330. db.session.query(Message.id)
  331. .filter_by(conversation_id=conversation_id, workflow_run_id=workflow_run_id)
  332. .first()
  333. )
  334. message_id = str(message_data.id) if message_data else None
  335. metadata = {
  336. "workflow_id": workflow_id,
  337. "conversation_id": conversation_id,
  338. "workflow_run_id": workflow_run_id,
  339. "tenant_id": tenant_id,
  340. "elapsed_time": workflow_run_elapsed_time,
  341. "status": workflow_run_status,
  342. "version": workflow_run_version,
  343. "total_tokens": total_tokens,
  344. "file_list": file_list,
  345. "triggered_form": workflow_run.triggered_from,
  346. "user_id": user_id,
  347. }
  348. workflow_trace_info = WorkflowTraceInfo(
  349. workflow_data=workflow_run.to_dict(),
  350. conversation_id=conversation_id,
  351. workflow_id=workflow_id,
  352. tenant_id=tenant_id,
  353. workflow_run_id=workflow_run_id,
  354. workflow_run_elapsed_time=workflow_run_elapsed_time,
  355. workflow_run_status=workflow_run_status,
  356. workflow_run_inputs=workflow_run_inputs,
  357. workflow_run_outputs=workflow_run_outputs,
  358. workflow_run_version=workflow_run_version,
  359. error=error,
  360. total_tokens=total_tokens,
  361. file_list=file_list,
  362. query=query,
  363. metadata=metadata,
  364. workflow_app_log_id=workflow_app_log_id,
  365. message_id=message_id,
  366. start_time=workflow_run.created_at,
  367. end_time=workflow_run.finished_at,
  368. )
  369. return workflow_trace_info
  370. def message_trace(self, message_id):
  371. message_data = get_message_data(message_id)
  372. if not message_data:
  373. return {}
  374. conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
  375. conversation_mode = conversation_mode[0]
  376. created_at = message_data.created_at
  377. inputs = message_data.message
  378. # get message file data
  379. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  380. file_list = []
  381. if message_file_data and message_file_data.url is not None:
  382. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  383. file_list.append(file_url)
  384. metadata = {
  385. "conversation_id": message_data.conversation_id,
  386. "ls_provider": message_data.model_provider,
  387. "ls_model_name": message_data.model_id,
  388. "status": message_data.status,
  389. "from_end_user_id": message_data.from_account_id,
  390. "from_account_id": message_data.from_account_id,
  391. "agent_based": message_data.agent_based,
  392. "workflow_run_id": message_data.workflow_run_id,
  393. "from_source": message_data.from_source,
  394. "message_id": message_id,
  395. }
  396. message_tokens = message_data.message_tokens
  397. message_trace_info = MessageTraceInfo(
  398. message_id=message_id,
  399. message_data=message_data.to_dict(),
  400. conversation_model=conversation_mode,
  401. message_tokens=message_tokens,
  402. answer_tokens=message_data.answer_tokens,
  403. total_tokens=message_tokens + message_data.answer_tokens,
  404. error=message_data.error or "",
  405. inputs=inputs,
  406. outputs=message_data.answer,
  407. file_list=file_list,
  408. start_time=created_at,
  409. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  410. metadata=metadata,
  411. message_file_data=message_file_data,
  412. conversation_mode=conversation_mode,
  413. )
  414. return message_trace_info
  415. def moderation_trace(self, message_id, timer, **kwargs):
  416. moderation_result = kwargs.get("moderation_result")
  417. inputs = kwargs.get("inputs")
  418. message_data = get_message_data(message_id)
  419. if not message_data:
  420. return {}
  421. metadata = {
  422. "message_id": message_id,
  423. "action": moderation_result.action,
  424. "preset_response": moderation_result.preset_response,
  425. "query": moderation_result.query,
  426. }
  427. # get workflow_app_log_id
  428. workflow_app_log_id = None
  429. if message_data.workflow_run_id:
  430. workflow_app_log_data = (
  431. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  432. )
  433. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  434. moderation_trace_info = ModerationTraceInfo(
  435. message_id=workflow_app_log_id or message_id,
  436. inputs=inputs,
  437. message_data=message_data.to_dict(),
  438. flagged=moderation_result.flagged,
  439. action=moderation_result.action,
  440. preset_response=moderation_result.preset_response,
  441. query=moderation_result.query,
  442. start_time=timer.get("start"),
  443. end_time=timer.get("end"),
  444. metadata=metadata,
  445. )
  446. return moderation_trace_info
  447. def suggested_question_trace(self, message_id, timer, **kwargs):
  448. suggested_question = kwargs.get("suggested_question")
  449. message_data = get_message_data(message_id)
  450. if not message_data:
  451. return {}
  452. metadata = {
  453. "message_id": message_id,
  454. "ls_provider": message_data.model_provider,
  455. "ls_model_name": message_data.model_id,
  456. "status": message_data.status,
  457. "from_end_user_id": message_data.from_account_id,
  458. "from_account_id": message_data.from_account_id,
  459. "agent_based": message_data.agent_based,
  460. "workflow_run_id": message_data.workflow_run_id,
  461. "from_source": message_data.from_source,
  462. }
  463. # get workflow_app_log_id
  464. workflow_app_log_id = None
  465. if message_data.workflow_run_id:
  466. workflow_app_log_data = (
  467. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  468. )
  469. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  470. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  471. message_id=workflow_app_log_id or message_id,
  472. message_data=message_data.to_dict(),
  473. inputs=message_data.message,
  474. outputs=message_data.answer,
  475. start_time=timer.get("start"),
  476. end_time=timer.get("end"),
  477. metadata=metadata,
  478. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  479. status=message_data.status,
  480. error=message_data.error,
  481. from_account_id=message_data.from_account_id,
  482. agent_based=message_data.agent_based,
  483. from_source=message_data.from_source,
  484. model_provider=message_data.model_provider,
  485. model_id=message_data.model_id,
  486. suggested_question=suggested_question,
  487. level=message_data.status,
  488. status_message=message_data.error,
  489. )
  490. return suggested_question_trace_info
  491. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  492. documents = kwargs.get("documents")
  493. message_data = get_message_data(message_id)
  494. if not message_data:
  495. return {}
  496. metadata = {
  497. "message_id": message_id,
  498. "ls_provider": message_data.model_provider,
  499. "ls_model_name": message_data.model_id,
  500. "status": message_data.status,
  501. "from_end_user_id": message_data.from_account_id,
  502. "from_account_id": message_data.from_account_id,
  503. "agent_based": message_data.agent_based,
  504. "workflow_run_id": message_data.workflow_run_id,
  505. "from_source": message_data.from_source,
  506. }
  507. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  508. message_id=message_id,
  509. inputs=message_data.query or message_data.inputs,
  510. documents=[doc.model_dump() for doc in documents],
  511. start_time=timer.get("start"),
  512. end_time=timer.get("end"),
  513. metadata=metadata,
  514. message_data=message_data.to_dict(),
  515. )
  516. return dataset_retrieval_trace_info
  517. def tool_trace(self, message_id, timer, **kwargs):
  518. tool_name = kwargs.get("tool_name")
  519. tool_inputs = kwargs.get("tool_inputs")
  520. tool_outputs = kwargs.get("tool_outputs")
  521. message_data = get_message_data(message_id)
  522. if not message_data:
  523. return {}
  524. tool_config = {}
  525. time_cost = 0
  526. error = None
  527. tool_parameters = {}
  528. created_time = message_data.created_at
  529. end_time = message_data.updated_at
  530. agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts
  531. for agent_thought in agent_thoughts:
  532. if tool_name in agent_thought.tools:
  533. created_time = agent_thought.created_at
  534. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  535. tool_config = tool_meta_data.get("tool_config", {})
  536. time_cost = tool_meta_data.get("time_cost", 0)
  537. end_time = created_time + timedelta(seconds=time_cost)
  538. error = tool_meta_data.get("error", "")
  539. tool_parameters = tool_meta_data.get("tool_parameters", {})
  540. metadata = {
  541. "message_id": message_id,
  542. "tool_name": tool_name,
  543. "tool_inputs": tool_inputs,
  544. "tool_outputs": tool_outputs,
  545. "tool_config": tool_config,
  546. "time_cost": time_cost,
  547. "error": error,
  548. "tool_parameters": tool_parameters,
  549. }
  550. file_url = ""
  551. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  552. if message_file_data:
  553. message_file_id = message_file_data.id if message_file_data else None
  554. type = message_file_data.type
  555. created_by_role = message_file_data.created_by_role
  556. created_user_id = message_file_data.created_by
  557. file_url = f"{self.file_base_url}/{message_file_data.url}"
  558. metadata.update(
  559. {
  560. "message_file_id": message_file_id,
  561. "created_by_role": created_by_role,
  562. "created_user_id": created_user_id,
  563. "type": type,
  564. }
  565. )
  566. tool_trace_info = ToolTraceInfo(
  567. message_id=message_id,
  568. message_data=message_data.to_dict(),
  569. tool_name=tool_name,
  570. start_time=timer.get("start") if timer else created_time,
  571. end_time=timer.get("end") if timer else end_time,
  572. tool_inputs=tool_inputs,
  573. tool_outputs=tool_outputs,
  574. metadata=metadata,
  575. message_file_data=message_file_data,
  576. error=error,
  577. inputs=message_data.message,
  578. outputs=message_data.answer,
  579. tool_config=tool_config,
  580. time_cost=time_cost,
  581. tool_parameters=tool_parameters,
  582. file_url=file_url,
  583. )
  584. return tool_trace_info
  585. def generate_name_trace(self, conversation_id, timer, **kwargs):
  586. generate_conversation_name = kwargs.get("generate_conversation_name")
  587. inputs = kwargs.get("inputs")
  588. tenant_id = kwargs.get("tenant_id")
  589. start_time = timer.get("start")
  590. end_time = timer.get("end")
  591. metadata = {
  592. "conversation_id": conversation_id,
  593. "tenant_id": tenant_id,
  594. }
  595. generate_name_trace_info = GenerateNameTraceInfo(
  596. conversation_id=conversation_id,
  597. inputs=inputs,
  598. outputs=generate_conversation_name,
  599. start_time=start_time,
  600. end_time=end_time,
  601. metadata=metadata,
  602. tenant_id=tenant_id,
  603. )
  604. return generate_name_trace_info
  605. trace_manager_timer = None
  606. trace_manager_queue = queue.Queue()
  607. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
  608. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  609. class TraceQueueManager:
  610. def __init__(self, app_id=None, user_id=None):
  611. global trace_manager_timer
  612. self.app_id = app_id
  613. self.user_id = user_id
  614. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
  615. self.flask_app = current_app._get_current_object()
  616. if trace_manager_timer is None:
  617. self.start_timer()
  618. def add_trace_task(self, trace_task: TraceTask):
  619. global trace_manager_timer, trace_manager_queue
  620. try:
  621. if self.trace_instance:
  622. trace_task.app_id = self.app_id
  623. trace_manager_queue.put(trace_task)
  624. except Exception as e:
  625. logging.debug(f"Error adding trace task: {e}")
  626. finally:
  627. self.start_timer()
  628. def collect_tasks(self):
  629. global trace_manager_queue
  630. tasks = []
  631. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  632. task = trace_manager_queue.get_nowait()
  633. tasks.append(task)
  634. trace_manager_queue.task_done()
  635. return tasks
  636. def run(self):
  637. try:
  638. tasks = self.collect_tasks()
  639. if tasks:
  640. self.send_to_celery(tasks)
  641. except Exception as e:
  642. logging.debug(f"Error processing trace tasks: {e}")
  643. def start_timer(self):
  644. global trace_manager_timer
  645. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  646. trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  647. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  648. trace_manager_timer.daemon = False
  649. trace_manager_timer.start()
  650. def send_to_celery(self, tasks: list[TraceTask]):
  651. with self.flask_app.app_context():
  652. for task in tasks:
  653. trace_info = task.execute()
  654. task_data = {
  655. "app_id": task.app_id,
  656. "trace_info_type": type(trace_info).__name__,
  657. "trace_info": trace_info.model_dump() if trace_info else {},
  658. }
  659. process_trace_tasks.delay(task_data)