ops_trace_manager.py 28 KB


  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from datetime import timedelta
  8. from typing import Any, Optional, Union
  9. from uuid import UUID
  10. from flask import current_app
  11. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  12. from core.ops.entities.config_entity import (
  13. LangfuseConfig,
  14. LangSmithConfig,
  15. TracingProviderEnum,
  16. )
  17. from core.ops.entities.trace_entity import (
  18. DatasetRetrievalTraceInfo,
  19. GenerateNameTraceInfo,
  20. MessageTraceInfo,
  21. ModerationTraceInfo,
  22. SuggestedQuestionTraceInfo,
  23. ToolTraceInfo,
  24. TraceTaskName,
  25. WorkflowTraceInfo,
  26. )
  27. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  28. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  29. from core.ops.utils import get_message_data
  30. from extensions.ext_database import db
  31. from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
  32. from models.workflow import WorkflowAppLog, WorkflowRun
  33. from tasks.ops_trace_task import process_trace_tasks
  34. provider_config_map = {
  35. TracingProviderEnum.LANGFUSE.value: {
  36. "config_class": LangfuseConfig,
  37. "secret_keys": ["public_key", "secret_key"],
  38. "other_keys": ["host", "project_key"],
  39. "trace_instance": LangFuseDataTrace,
  40. },
  41. TracingProviderEnum.LANGSMITH.value: {
  42. "config_class": LangSmithConfig,
  43. "secret_keys": ["api_key"],
  44. "other_keys": ["project", "endpoint"],
  45. "trace_instance": LangSmithDataTrace,
  46. },
  47. }
  48. class OpsTraceManager:
  49. @classmethod
  50. def encrypt_tracing_config(
  51. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  52. ):
  53. """
  54. Encrypt tracing config.
  55. :param tenant_id: tenant id
  56. :param tracing_provider: tracing provider
  57. :param tracing_config: tracing config dictionary to be encrypted
  58. :param current_trace_config: current tracing configuration for keeping existing values
  59. :return: encrypted tracing configuration
  60. """
  61. # Get the configuration class and the keys that require encryption
  62. config_class, secret_keys, other_keys = (
  63. provider_config_map[tracing_provider]["config_class"],
  64. provider_config_map[tracing_provider]["secret_keys"],
  65. provider_config_map[tracing_provider]["other_keys"],
  66. )
  67. new_config = {}
  68. # Encrypt necessary keys
  69. for key in secret_keys:
  70. if key in tracing_config:
  71. if "*" in tracing_config[key]:
  72. # If the key contains '*', retain the original value from the current config
  73. new_config[key] = current_trace_config.get(key, tracing_config[key])
  74. else:
  75. # Otherwise, encrypt the key
  76. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  77. for key in other_keys:
  78. new_config[key] = tracing_config.get(key, "")
  79. # Create a new instance of the config class with the new configuration
  80. encrypted_config = config_class(**new_config)
  81. return encrypted_config.model_dump()
  82. @classmethod
  83. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  84. """
  85. Decrypt tracing config
  86. :param tenant_id: tenant id
  87. :param tracing_provider: tracing provider
  88. :param tracing_config: tracing config
  89. :return:
  90. """
  91. config_class, secret_keys, other_keys = (
  92. provider_config_map[tracing_provider]["config_class"],
  93. provider_config_map[tracing_provider]["secret_keys"],
  94. provider_config_map[tracing_provider]["other_keys"],
  95. )
  96. new_config = {}
  97. for key in secret_keys:
  98. if key in tracing_config:
  99. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  100. for key in other_keys:
  101. new_config[key] = tracing_config.get(key, "")
  102. return config_class(**new_config).model_dump()
  103. @classmethod
  104. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
  105. """
  106. Decrypt tracing config
  107. :param tracing_provider: tracing provider
  108. :param decrypt_tracing_config: tracing config
  109. :return:
  110. """
  111. config_class, secret_keys, other_keys = (
  112. provider_config_map[tracing_provider]["config_class"],
  113. provider_config_map[tracing_provider]["secret_keys"],
  114. provider_config_map[tracing_provider]["other_keys"],
  115. )
  116. new_config = {}
  117. for key in secret_keys:
  118. if key in decrypt_tracing_config:
  119. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  120. for key in other_keys:
  121. new_config[key] = decrypt_tracing_config.get(key, "")
  122. return config_class(**new_config).model_dump()
  123. @classmethod
  124. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  125. """
  126. Get decrypted tracing config
  127. :param app_id: app id
  128. :param tracing_provider: tracing provider
  129. :return:
  130. """
  131. trace_config_data: TraceAppConfig = (
  132. db.session.query(TraceAppConfig)
  133. .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
  134. .first()
  135. )
  136. if not trace_config_data:
  137. return None
  138. # decrypt_token
  139. tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id
  140. decrypt_tracing_config = cls.decrypt_tracing_config(
  141. tenant_id, tracing_provider, trace_config_data.tracing_config
  142. )
  143. return decrypt_tracing_config
  144. @classmethod
  145. def get_ops_trace_instance(
  146. cls,
  147. app_id: Optional[Union[UUID, str]] = None,
  148. ):
  149. """
  150. Get ops trace through model config
  151. :param app_id: app_id
  152. :return:
  153. """
  154. if isinstance(app_id, UUID):
  155. app_id = str(app_id)
  156. if app_id is None:
  157. return None
  158. app: App = db.session.query(App).filter(App.id == app_id).first()
  159. if app is None:
  160. return None
  161. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  162. if app_ops_trace_config is None:
  163. return None
  164. tracing_provider = app_ops_trace_config.get("tracing_provider")
  165. if tracing_provider is None or tracing_provider not in provider_config_map:
  166. return None
  167. # decrypt_token
  168. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  169. if app_ops_trace_config.get("enabled"):
  170. trace_instance, config_class = (
  171. provider_config_map[tracing_provider]["trace_instance"],
  172. provider_config_map[tracing_provider]["config_class"],
  173. )
  174. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  175. return tracing_instance
  176. return None
  177. @classmethod
  178. def get_app_config_through_message_id(cls, message_id: str):
  179. app_model_config = None
  180. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  181. conversation_id = message_data.conversation_id
  182. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  183. if conversation_data.app_model_config_id:
  184. app_model_config = (
  185. db.session.query(AppModelConfig)
  186. .filter(AppModelConfig.id == conversation_data.app_model_config_id)
  187. .first()
  188. )
  189. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  190. app_model_config = conversation_data.override_model_configs
  191. return app_model_config
  192. @classmethod
  193. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  194. """
  195. Update app tracing config
  196. :param app_id: app id
  197. :param enabled: enabled
  198. :param tracing_provider: tracing provider
  199. :return:
  200. """
  201. # auth check
  202. if tracing_provider not in provider_config_map and tracing_provider is not None:
  203. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  204. app_config: App = db.session.query(App).filter(App.id == app_id).first()
  205. app_config.tracing = json.dumps(
  206. {
  207. "enabled": enabled,
  208. "tracing_provider": tracing_provider,
  209. }
  210. )
  211. db.session.commit()
  212. @classmethod
  213. def get_app_tracing_config(cls, app_id: str):
  214. """
  215. Get app tracing config
  216. :param app_id: app id
  217. :return:
  218. """
  219. app: App = db.session.query(App).filter(App.id == app_id).first()
  220. if not app.tracing:
  221. return {"enabled": False, "tracing_provider": None}
  222. app_trace_config = json.loads(app.tracing)
  223. return app_trace_config
  224. @staticmethod
  225. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  226. """
  227. Check trace config is effective
  228. :param tracing_config: tracing config
  229. :param tracing_provider: tracing provider
  230. :return:
  231. """
  232. config_type, trace_instance = (
  233. provider_config_map[tracing_provider]["config_class"],
  234. provider_config_map[tracing_provider]["trace_instance"],
  235. )
  236. tracing_config = config_type(**tracing_config)
  237. return trace_instance(tracing_config).api_check()
  238. @staticmethod
  239. def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
  240. """
  241. get trace config is project key
  242. :param tracing_config: tracing config
  243. :param tracing_provider: tracing provider
  244. :return:
  245. """
  246. config_type, trace_instance = (
  247. provider_config_map[tracing_provider]["config_class"],
  248. provider_config_map[tracing_provider]["trace_instance"],
  249. )
  250. tracing_config = config_type(**tracing_config)
  251. return trace_instance(tracing_config).get_project_key()
  252. @staticmethod
  253. def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
  254. """
  255. get trace config is project key
  256. :param tracing_config: tracing config
  257. :param tracing_provider: tracing provider
  258. :return:
  259. """
  260. config_type, trace_instance = (
  261. provider_config_map[tracing_provider]["config_class"],
  262. provider_config_map[tracing_provider]["trace_instance"],
  263. )
  264. tracing_config = config_type(**tracing_config)
  265. return trace_instance(tracing_config).get_project_url()
  266. class TraceTask:
  267. def __init__(
  268. self,
  269. trace_type: Any,
  270. message_id: Optional[str] = None,
  271. workflow_run: Optional[WorkflowRun] = None,
  272. conversation_id: Optional[str] = None,
  273. user_id: Optional[str] = None,
  274. timer: Optional[Any] = None,
  275. **kwargs,
  276. ):
  277. self.trace_type = trace_type
  278. self.message_id = message_id
  279. self.workflow_run = workflow_run
  280. self.conversation_id = conversation_id
  281. self.user_id = user_id
  282. self.timer = timer
  283. self.kwargs = kwargs
  284. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  285. self.app_id = None
  286. def execute(self):
  287. return self.preprocess()
  288. def preprocess(self):
  289. preprocess_map = {
  290. TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
  291. TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
  292. self.workflow_run, self.conversation_id, self.user_id
  293. ),
  294. TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(self.message_id),
  295. TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(self.message_id, self.timer, **self.kwargs),
  296. TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
  297. self.message_id, self.timer, **self.kwargs
  298. ),
  299. TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
  300. self.message_id, self.timer, **self.kwargs
  301. ),
  302. TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(self.message_id, self.timer, **self.kwargs),
  303. TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
  304. self.conversation_id, self.timer, **self.kwargs
  305. ),
  306. }
  307. return preprocess_map.get(self.trace_type, lambda: None)()
  308. # process methods for different trace types
  309. def conversation_trace(self, **kwargs):
  310. return kwargs
  311. def workflow_trace(self, workflow_run: WorkflowRun, conversation_id, user_id):
  312. workflow_id = workflow_run.workflow_id
  313. tenant_id = workflow_run.tenant_id
  314. workflow_run_id = workflow_run.id
  315. workflow_run_elapsed_time = workflow_run.elapsed_time
  316. workflow_run_status = workflow_run.status
  317. workflow_run_inputs = workflow_run.inputs_dict
  318. workflow_run_outputs = workflow_run.outputs_dict
  319. workflow_run_version = workflow_run.version
  320. error = workflow_run.error or ""
  321. total_tokens = workflow_run.total_tokens
  322. file_list = workflow_run_inputs.get("sys.file") or []
  323. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  324. # get workflow_app_log_id
  325. workflow_app_log_data = (
  326. db.session.query(WorkflowAppLog)
  327. .filter_by(tenant_id=tenant_id, app_id=workflow_run.app_id, workflow_run_id=workflow_run.id)
  328. .first()
  329. )
  330. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  331. # get message_id
  332. message_data = (
  333. db.session.query(Message.id)
  334. .filter_by(conversation_id=conversation_id, workflow_run_id=workflow_run_id)
  335. .first()
  336. )
  337. message_id = str(message_data.id) if message_data else None
  338. metadata = {
  339. "workflow_id": workflow_id,
  340. "conversation_id": conversation_id,
  341. "workflow_run_id": workflow_run_id,
  342. "tenant_id": tenant_id,
  343. "elapsed_time": workflow_run_elapsed_time,
  344. "status": workflow_run_status,
  345. "version": workflow_run_version,
  346. "total_tokens": total_tokens,
  347. "file_list": file_list,
  348. "triggered_form": workflow_run.triggered_from,
  349. "user_id": user_id,
  350. }
  351. workflow_trace_info = WorkflowTraceInfo(
  352. workflow_data=workflow_run.to_dict(),
  353. conversation_id=conversation_id,
  354. workflow_id=workflow_id,
  355. tenant_id=tenant_id,
  356. workflow_run_id=workflow_run_id,
  357. workflow_run_elapsed_time=workflow_run_elapsed_time,
  358. workflow_run_status=workflow_run_status,
  359. workflow_run_inputs=workflow_run_inputs,
  360. workflow_run_outputs=workflow_run_outputs,
  361. workflow_run_version=workflow_run_version,
  362. error=error,
  363. total_tokens=total_tokens,
  364. file_list=file_list,
  365. query=query,
  366. metadata=metadata,
  367. workflow_app_log_id=workflow_app_log_id,
  368. message_id=message_id,
  369. start_time=workflow_run.created_at,
  370. end_time=workflow_run.finished_at,
  371. )
  372. return workflow_trace_info
  373. def message_trace(self, message_id):
  374. message_data = get_message_data(message_id)
  375. if not message_data:
  376. return {}
  377. conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
  378. conversation_mode = conversation_mode[0]
  379. created_at = message_data.created_at
  380. inputs = message_data.message
  381. # get message file data
  382. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  383. file_list = []
  384. if message_file_data and message_file_data.url is not None:
  385. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  386. file_list.append(file_url)
  387. metadata = {
  388. "conversation_id": message_data.conversation_id,
  389. "ls_provider": message_data.model_provider,
  390. "ls_model_name": message_data.model_id,
  391. "status": message_data.status,
  392. "from_end_user_id": message_data.from_account_id,
  393. "from_account_id": message_data.from_account_id,
  394. "agent_based": message_data.agent_based,
  395. "workflow_run_id": message_data.workflow_run_id,
  396. "from_source": message_data.from_source,
  397. "message_id": message_id,
  398. }
  399. message_tokens = message_data.message_tokens
  400. message_trace_info = MessageTraceInfo(
  401. message_id=message_id,
  402. message_data=message_data.to_dict(),
  403. conversation_model=conversation_mode,
  404. message_tokens=message_tokens,
  405. answer_tokens=message_data.answer_tokens,
  406. total_tokens=message_tokens + message_data.answer_tokens,
  407. error=message_data.error or "",
  408. inputs=inputs,
  409. outputs=message_data.answer,
  410. file_list=file_list,
  411. start_time=created_at,
  412. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  413. metadata=metadata,
  414. message_file_data=message_file_data,
  415. conversation_mode=conversation_mode,
  416. )
  417. return message_trace_info
  418. def moderation_trace(self, message_id, timer, **kwargs):
  419. moderation_result = kwargs.get("moderation_result")
  420. inputs = kwargs.get("inputs")
  421. message_data = get_message_data(message_id)
  422. if not message_data:
  423. return {}
  424. metadata = {
  425. "message_id": message_id,
  426. "action": moderation_result.action,
  427. "preset_response": moderation_result.preset_response,
  428. "query": moderation_result.query,
  429. }
  430. # get workflow_app_log_id
  431. workflow_app_log_id = None
  432. if message_data.workflow_run_id:
  433. workflow_app_log_data = (
  434. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  435. )
  436. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  437. moderation_trace_info = ModerationTraceInfo(
  438. message_id=workflow_app_log_id or message_id,
  439. inputs=inputs,
  440. message_data=message_data.to_dict(),
  441. flagged=moderation_result.flagged,
  442. action=moderation_result.action,
  443. preset_response=moderation_result.preset_response,
  444. query=moderation_result.query,
  445. start_time=timer.get("start"),
  446. end_time=timer.get("end"),
  447. metadata=metadata,
  448. )
  449. return moderation_trace_info
  450. def suggested_question_trace(self, message_id, timer, **kwargs):
  451. suggested_question = kwargs.get("suggested_question")
  452. message_data = get_message_data(message_id)
  453. if not message_data:
  454. return {}
  455. metadata = {
  456. "message_id": message_id,
  457. "ls_provider": message_data.model_provider,
  458. "ls_model_name": message_data.model_id,
  459. "status": message_data.status,
  460. "from_end_user_id": message_data.from_account_id,
  461. "from_account_id": message_data.from_account_id,
  462. "agent_based": message_data.agent_based,
  463. "workflow_run_id": message_data.workflow_run_id,
  464. "from_source": message_data.from_source,
  465. }
  466. # get workflow_app_log_id
  467. workflow_app_log_id = None
  468. if message_data.workflow_run_id:
  469. workflow_app_log_data = (
  470. db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
  471. )
  472. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  473. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  474. message_id=workflow_app_log_id or message_id,
  475. message_data=message_data.to_dict(),
  476. inputs=message_data.message,
  477. outputs=message_data.answer,
  478. start_time=timer.get("start"),
  479. end_time=timer.get("end"),
  480. metadata=metadata,
  481. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  482. status=message_data.status,
  483. error=message_data.error,
  484. from_account_id=message_data.from_account_id,
  485. agent_based=message_data.agent_based,
  486. from_source=message_data.from_source,
  487. model_provider=message_data.model_provider,
  488. model_id=message_data.model_id,
  489. suggested_question=suggested_question,
  490. level=message_data.status,
  491. status_message=message_data.error,
  492. )
  493. return suggested_question_trace_info
  494. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  495. documents = kwargs.get("documents")
  496. message_data = get_message_data(message_id)
  497. if not message_data:
  498. return {}
  499. metadata = {
  500. "message_id": message_id,
  501. "ls_provider": message_data.model_provider,
  502. "ls_model_name": message_data.model_id,
  503. "status": message_data.status,
  504. "from_end_user_id": message_data.from_account_id,
  505. "from_account_id": message_data.from_account_id,
  506. "agent_based": message_data.agent_based,
  507. "workflow_run_id": message_data.workflow_run_id,
  508. "from_source": message_data.from_source,
  509. }
  510. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  511. message_id=message_id,
  512. inputs=message_data.query or message_data.inputs,
  513. documents=[doc.model_dump() for doc in documents],
  514. start_time=timer.get("start"),
  515. end_time=timer.get("end"),
  516. metadata=metadata,
  517. message_data=message_data.to_dict(),
  518. )
  519. return dataset_retrieval_trace_info
  520. def tool_trace(self, message_id, timer, **kwargs):
  521. tool_name = kwargs.get("tool_name")
  522. tool_inputs = kwargs.get("tool_inputs")
  523. tool_outputs = kwargs.get("tool_outputs")
  524. message_data = get_message_data(message_id)
  525. if not message_data:
  526. return {}
  527. tool_config = {}
  528. time_cost = 0
  529. error = None
  530. tool_parameters = {}
  531. created_time = message_data.created_at
  532. end_time = message_data.updated_at
  533. agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts
  534. for agent_thought in agent_thoughts:
  535. if tool_name in agent_thought.tools:
  536. created_time = agent_thought.created_at
  537. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  538. tool_config = tool_meta_data.get("tool_config", {})
  539. time_cost = tool_meta_data.get("time_cost", 0)
  540. end_time = created_time + timedelta(seconds=time_cost)
  541. error = tool_meta_data.get("error", "")
  542. tool_parameters = tool_meta_data.get("tool_parameters", {})
  543. metadata = {
  544. "message_id": message_id,
  545. "tool_name": tool_name,
  546. "tool_inputs": tool_inputs,
  547. "tool_outputs": tool_outputs,
  548. "tool_config": tool_config,
  549. "time_cost": time_cost,
  550. "error": error,
  551. "tool_parameters": tool_parameters,
  552. }
  553. file_url = ""
  554. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  555. if message_file_data:
  556. message_file_id = message_file_data.id if message_file_data else None
  557. type = message_file_data.type
  558. created_by_role = message_file_data.created_by_role
  559. created_user_id = message_file_data.created_by
  560. file_url = f"{self.file_base_url}/{message_file_data.url}"
  561. metadata.update(
  562. {
  563. "message_file_id": message_file_id,
  564. "created_by_role": created_by_role,
  565. "created_user_id": created_user_id,
  566. "type": type,
  567. }
  568. )
  569. tool_trace_info = ToolTraceInfo(
  570. message_id=message_id,
  571. message_data=message_data.to_dict(),
  572. tool_name=tool_name,
  573. start_time=timer.get("start") if timer else created_time,
  574. end_time=timer.get("end") if timer else end_time,
  575. tool_inputs=tool_inputs,
  576. tool_outputs=tool_outputs,
  577. metadata=metadata,
  578. message_file_data=message_file_data,
  579. error=error,
  580. inputs=message_data.message,
  581. outputs=message_data.answer,
  582. tool_config=tool_config,
  583. time_cost=time_cost,
  584. tool_parameters=tool_parameters,
  585. file_url=file_url,
  586. )
  587. return tool_trace_info
  588. def generate_name_trace(self, conversation_id, timer, **kwargs):
  589. generate_conversation_name = kwargs.get("generate_conversation_name")
  590. inputs = kwargs.get("inputs")
  591. tenant_id = kwargs.get("tenant_id")
  592. start_time = timer.get("start")
  593. end_time = timer.get("end")
  594. metadata = {
  595. "conversation_id": conversation_id,
  596. "tenant_id": tenant_id,
  597. }
  598. generate_name_trace_info = GenerateNameTraceInfo(
  599. conversation_id=conversation_id,
  600. inputs=inputs,
  601. outputs=generate_conversation_name,
  602. start_time=start_time,
  603. end_time=end_time,
  604. metadata=metadata,
  605. tenant_id=tenant_id,
  606. )
  607. return generate_name_trace_info
  608. trace_manager_timer = None
  609. trace_manager_queue = queue.Queue()
  610. trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
  611. trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
  612. class TraceQueueManager:
  613. def __init__(self, app_id=None, user_id=None):
  614. global trace_manager_timer
  615. self.app_id = app_id
  616. self.user_id = user_id
  617. self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
  618. self.flask_app = current_app._get_current_object()
  619. if trace_manager_timer is None:
  620. self.start_timer()
  621. def add_trace_task(self, trace_task: TraceTask):
  622. global trace_manager_timer, trace_manager_queue
  623. try:
  624. if self.trace_instance:
  625. trace_task.app_id = self.app_id
  626. trace_manager_queue.put(trace_task)
  627. except Exception as e:
  628. logging.exception(f"Error adding trace task: {e}")
  629. finally:
  630. self.start_timer()
  631. def collect_tasks(self):
  632. global trace_manager_queue
  633. tasks = []
  634. while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
  635. task = trace_manager_queue.get_nowait()
  636. tasks.append(task)
  637. trace_manager_queue.task_done()
  638. return tasks
  639. def run(self):
  640. try:
  641. tasks = self.collect_tasks()
  642. if tasks:
  643. self.send_to_celery(tasks)
  644. except Exception as e:
  645. logging.exception(f"Error processing trace tasks: {e}")
  646. def start_timer(self):
  647. global trace_manager_timer
  648. if trace_manager_timer is None or not trace_manager_timer.is_alive():
  649. trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  650. trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  651. trace_manager_timer.daemon = False
  652. trace_manager_timer.start()
  653. def send_to_celery(self, tasks: list[TraceTask]):
  654. with self.flask_app.app_context():
  655. for task in tasks:
  656. trace_info = task.execute()
  657. task_data = {
  658. "app_id": task.app_id,
  659. "trace_info_type": type(trace_info).__name__,
  660. "trace_info": trace_info.model_dump() if trace_info else {},
  661. }
  662. process_trace_tasks.delay(task_data)