ops_trace_manager.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. import json
  2. import os
  3. import queue
  4. import threading
  5. from datetime import timedelta
  6. from enum import Enum
  7. from typing import Any, Optional, Union
  8. from uuid import UUID
  9. from flask import Flask, current_app
  10. from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
  11. from core.ops.base_trace_instance import BaseTraceInstance
  12. from core.ops.entities.config_entity import (
  13. LangfuseConfig,
  14. LangSmithConfig,
  15. TracingProviderEnum,
  16. )
  17. from core.ops.entities.trace_entity import (
  18. DatasetRetrievalTraceInfo,
  19. GenerateNameTraceInfo,
  20. MessageTraceInfo,
  21. ModerationTraceInfo,
  22. SuggestedQuestionTraceInfo,
  23. ToolTraceInfo,
  24. WorkflowTraceInfo,
  25. )
  26. from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
  27. from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
  28. from core.ops.utils import get_message_data
  29. from extensions.ext_database import db
  30. from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
  31. from models.workflow import WorkflowAppLog, WorkflowRun
  32. provider_config_map = {
  33. TracingProviderEnum.LANGFUSE.value: {
  34. 'config_class': LangfuseConfig,
  35. 'secret_keys': ['public_key', 'secret_key'],
  36. 'other_keys': ['host'],
  37. 'trace_instance': LangFuseDataTrace
  38. },
  39. TracingProviderEnum.LANGSMITH.value: {
  40. 'config_class': LangSmithConfig,
  41. 'secret_keys': ['api_key'],
  42. 'other_keys': ['project', 'endpoint'],
  43. 'trace_instance': LangSmithDataTrace
  44. }
  45. }
  46. class OpsTraceManager:
  47. @classmethod
  48. def encrypt_tracing_config(
  49. cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
  50. ):
  51. """
  52. Encrypt tracing config.
  53. :param tenant_id: tenant id
  54. :param tracing_provider: tracing provider
  55. :param tracing_config: tracing config dictionary to be encrypted
  56. :param current_trace_config: current tracing configuration for keeping existing values
  57. :return: encrypted tracing configuration
  58. """
  59. # Get the configuration class and the keys that require encryption
  60. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  61. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  62. new_config = {}
  63. # Encrypt necessary keys
  64. for key in secret_keys:
  65. if key in tracing_config:
  66. if '*' in tracing_config[key]:
  67. # If the key contains '*', retain the original value from the current config
  68. new_config[key] = current_trace_config.get(key, tracing_config[key])
  69. else:
  70. # Otherwise, encrypt the key
  71. new_config[key] = encrypt_token(tenant_id, tracing_config[key])
  72. for key in other_keys:
  73. new_config[key] = tracing_config.get(key, "")
  74. # Create a new instance of the config class with the new configuration
  75. encrypted_config = config_class(**new_config)
  76. return encrypted_config.model_dump()
  77. @classmethod
  78. def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
  79. """
  80. Decrypt tracing config
  81. :param tenant_id: tenant id
  82. :param tracing_provider: tracing provider
  83. :param tracing_config: tracing config
  84. :return:
  85. """
  86. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  87. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  88. new_config = {}
  89. for key in secret_keys:
  90. if key in tracing_config:
  91. new_config[key] = decrypt_token(tenant_id, tracing_config[key])
  92. for key in other_keys:
  93. new_config[key] = tracing_config.get(key, "")
  94. return config_class(**new_config).model_dump()
  95. @classmethod
  96. def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config:dict):
  97. """
  98. Decrypt tracing config
  99. :param tracing_provider: tracing provider
  100. :param decrypt_tracing_config: tracing config
  101. :return:
  102. """
  103. config_class, secret_keys, other_keys = provider_config_map[tracing_provider]['config_class'], \
  104. provider_config_map[tracing_provider]['secret_keys'], provider_config_map[tracing_provider]['other_keys']
  105. new_config = {}
  106. for key in secret_keys:
  107. if key in decrypt_tracing_config:
  108. new_config[key] = obfuscated_token(decrypt_tracing_config[key])
  109. for key in other_keys:
  110. new_config[key] = decrypt_tracing_config.get(key, "")
  111. return config_class(**new_config).model_dump()
  112. @classmethod
  113. def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
  114. """
  115. Get decrypted tracing config
  116. :param app_id: app id
  117. :param tracing_provider: tracing provider
  118. :return:
  119. """
  120. trace_config_data: TraceAppConfig = db.session.query(TraceAppConfig).filter(
  121. TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider
  122. ).first()
  123. if not trace_config_data:
  124. return None
  125. # decrypt_token
  126. tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id
  127. decrypt_tracing_config = cls.decrypt_tracing_config(
  128. tenant_id, tracing_provider, trace_config_data.tracing_config
  129. )
  130. return decrypt_tracing_config
  131. @classmethod
  132. def get_ops_trace_instance(
  133. cls,
  134. app_id: Optional[Union[UUID, str]] = None,
  135. message_id: Optional[str] = None,
  136. conversation_id: Optional[str] = None
  137. ):
  138. """
  139. Get ops trace through model config
  140. :param app_id: app_id
  141. :param message_id: message_id
  142. :param conversation_id: conversation_id
  143. :return:
  144. """
  145. if conversation_id is not None:
  146. conversation_data: Conversation = db.session.query(Conversation).filter(
  147. Conversation.id == conversation_id
  148. ).first()
  149. if conversation_data:
  150. app_id = conversation_data.app_id
  151. if message_id is not None:
  152. record: Message = db.session.query(Message).filter(Message.id == message_id).first()
  153. app_id = record.app_id
  154. if isinstance(app_id, UUID):
  155. app_id = str(app_id)
  156. if app_id is None:
  157. return None
  158. app: App = db.session.query(App).filter(
  159. App.id == app_id
  160. ).first()
  161. app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
  162. if app_ops_trace_config is not None:
  163. tracing_provider = app_ops_trace_config.get('tracing_provider')
  164. else:
  165. return None
  166. # decrypt_token
  167. decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
  168. if app_ops_trace_config.get('enabled'):
  169. trace_instance, config_class = provider_config_map[tracing_provider]['trace_instance'], \
  170. provider_config_map[tracing_provider]['config_class']
  171. tracing_instance = trace_instance(config_class(**decrypt_trace_config))
  172. return tracing_instance
  173. return None
  174. @classmethod
  175. def get_app_config_through_message_id(cls, message_id: str):
  176. app_model_config = None
  177. message_data = db.session.query(Message).filter(Message.id == message_id).first()
  178. conversation_id = message_data.conversation_id
  179. conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
  180. if conversation_data.app_model_config_id:
  181. app_model_config = db.session.query(AppModelConfig).filter(
  182. AppModelConfig.id == conversation_data.app_model_config_id
  183. ).first()
  184. elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
  185. app_model_config = conversation_data.override_model_configs
  186. return app_model_config
  187. @classmethod
  188. def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
  189. """
  190. Update app tracing config
  191. :param app_id: app id
  192. :param enabled: enabled
  193. :param tracing_provider: tracing provider
  194. :return:
  195. """
  196. # auth check
  197. if tracing_provider not in provider_config_map.keys() and tracing_provider is not None:
  198. raise ValueError(f"Invalid tracing provider: {tracing_provider}")
  199. app_config: App = db.session.query(App).filter(App.id == app_id).first()
  200. app_config.tracing = json.dumps(
  201. {
  202. "enabled": enabled,
  203. "tracing_provider": tracing_provider,
  204. }
  205. )
  206. db.session.commit()
  207. @classmethod
  208. def get_app_tracing_config(cls, app_id: str):
  209. """
  210. Get app tracing config
  211. :param app_id: app id
  212. :return:
  213. """
  214. app: App = db.session.query(App).filter(App.id == app_id).first()
  215. if not app.tracing:
  216. return {
  217. "enabled": False,
  218. "tracing_provider": None
  219. }
  220. app_trace_config = json.loads(app.tracing)
  221. return app_trace_config
  222. @staticmethod
  223. def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
  224. """
  225. Check trace config is effective
  226. :param tracing_config: tracing config
  227. :param tracing_provider: tracing provider
  228. :return:
  229. """
  230. config_type, trace_instance = provider_config_map[tracing_provider]['config_class'], \
  231. provider_config_map[tracing_provider]['trace_instance']
  232. tracing_config = config_type(**tracing_config)
  233. return trace_instance(tracing_config).api_check()
  234. class TraceTaskName(str, Enum):
  235. CONVERSATION_TRACE = 'conversation_trace'
  236. WORKFLOW_TRACE = 'workflow_trace'
  237. MESSAGE_TRACE = 'message_trace'
  238. MODERATION_TRACE = 'moderation_trace'
  239. SUGGESTED_QUESTION_TRACE = 'suggested_question_trace'
  240. DATASET_RETRIEVAL_TRACE = 'dataset_retrieval_trace'
  241. TOOL_TRACE = 'tool_trace'
  242. GENERATE_NAME_TRACE = 'generate_name_trace'
  243. class TraceTask:
  244. def __init__(
  245. self,
  246. trace_type: Any,
  247. message_id: Optional[str] = None,
  248. workflow_run: Optional[WorkflowRun] = None,
  249. conversation_id: Optional[str] = None,
  250. timer: Optional[Any] = None,
  251. **kwargs
  252. ):
  253. self.trace_type = trace_type
  254. self.message_id = message_id
  255. self.workflow_run = workflow_run
  256. self.conversation_id = conversation_id
  257. self.timer = timer
  258. self.kwargs = kwargs
  259. self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
  260. def execute(self, trace_instance: BaseTraceInstance):
  261. method_name, trace_info = self.preprocess()
  262. if trace_instance:
  263. method = trace_instance.trace
  264. method(trace_info)
  265. def preprocess(self):
  266. if self.trace_type == TraceTaskName.CONVERSATION_TRACE:
  267. return TraceTaskName.CONVERSATION_TRACE, self.conversation_trace(**self.kwargs)
  268. if self.trace_type == TraceTaskName.WORKFLOW_TRACE:
  269. return TraceTaskName.WORKFLOW_TRACE, self.workflow_trace(self.workflow_run, self.conversation_id)
  270. elif self.trace_type == TraceTaskName.MESSAGE_TRACE:
  271. return TraceTaskName.MESSAGE_TRACE, self.message_trace(self.message_id)
  272. elif self.trace_type == TraceTaskName.MODERATION_TRACE:
  273. return TraceTaskName.MODERATION_TRACE, self.moderation_trace(self.message_id, self.timer, **self.kwargs)
  274. elif self.trace_type == TraceTaskName.SUGGESTED_QUESTION_TRACE:
  275. return TraceTaskName.SUGGESTED_QUESTION_TRACE, self.suggested_question_trace(
  276. self.message_id, self.timer, **self.kwargs
  277. )
  278. elif self.trace_type == TraceTaskName.DATASET_RETRIEVAL_TRACE:
  279. return TraceTaskName.DATASET_RETRIEVAL_TRACE, self.dataset_retrieval_trace(
  280. self.message_id, self.timer, **self.kwargs
  281. )
  282. elif self.trace_type == TraceTaskName.TOOL_TRACE:
  283. return TraceTaskName.TOOL_TRACE, self.tool_trace(self.message_id, self.timer, **self.kwargs)
  284. elif self.trace_type == TraceTaskName.GENERATE_NAME_TRACE:
  285. return TraceTaskName.GENERATE_NAME_TRACE, self.generate_name_trace(
  286. self.conversation_id, self.timer, **self.kwargs
  287. )
  288. else:
  289. return '', {}
  290. # process methods for different trace types
  291. def conversation_trace(self, **kwargs):
  292. return kwargs
  293. def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
  294. workflow_id = workflow_run.workflow_id
  295. tenant_id = workflow_run.tenant_id
  296. workflow_run_id = workflow_run.id
  297. workflow_run_elapsed_time = workflow_run.elapsed_time
  298. workflow_run_status = workflow_run.status
  299. workflow_run_inputs = (
  300. json.loads(workflow_run.inputs) if workflow_run.inputs else {}
  301. )
  302. workflow_run_outputs = (
  303. json.loads(workflow_run.outputs) if workflow_run.outputs else {}
  304. )
  305. workflow_run_version = workflow_run.version
  306. error = workflow_run.error if workflow_run.error else ""
  307. total_tokens = workflow_run.total_tokens
  308. file_list = workflow_run_inputs.get("sys.file") if workflow_run_inputs.get("sys.file") else []
  309. query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
  310. # get workflow_app_log_id
  311. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(workflow_run_id=workflow_run.id).first()
  312. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  313. # get message_id
  314. message_data = db.session.query(Message.id).filter_by(workflow_run_id=workflow_run_id).first()
  315. message_id = str(message_data.id) if message_data else None
  316. metadata = {
  317. "workflow_id": workflow_id,
  318. "conversation_id": conversation_id,
  319. "workflow_run_id": workflow_run_id,
  320. "tenant_id": tenant_id,
  321. "elapsed_time": workflow_run_elapsed_time,
  322. "status": workflow_run_status,
  323. "version": workflow_run_version,
  324. "total_tokens": total_tokens,
  325. "file_list": file_list,
  326. "triggered_form": workflow_run.triggered_from,
  327. }
  328. workflow_trace_info = WorkflowTraceInfo(
  329. workflow_data=workflow_run,
  330. conversation_id=conversation_id,
  331. workflow_id=workflow_id,
  332. tenant_id=tenant_id,
  333. workflow_run_id=workflow_run_id,
  334. workflow_run_elapsed_time=workflow_run_elapsed_time,
  335. workflow_run_status=workflow_run_status,
  336. workflow_run_inputs=workflow_run_inputs,
  337. workflow_run_outputs=workflow_run_outputs,
  338. workflow_run_version=workflow_run_version,
  339. error=error,
  340. total_tokens=total_tokens,
  341. file_list=file_list,
  342. query=query,
  343. metadata=metadata,
  344. workflow_app_log_id=workflow_app_log_id,
  345. message_id=message_id,
  346. start_time=workflow_run.created_at,
  347. end_time=workflow_run.finished_at,
  348. )
  349. return workflow_trace_info
  350. def message_trace(self, message_id):
  351. message_data = get_message_data(message_id)
  352. if not message_data:
  353. return {}
  354. conversation_mode = db.session.query(Conversation.mode).filter_by(id=message_data.conversation_id).first()
  355. conversation_mode = conversation_mode[0]
  356. created_at = message_data.created_at
  357. inputs = message_data.message
  358. # get message file data
  359. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  360. file_list = []
  361. if message_file_data and message_file_data.url is not None:
  362. file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
  363. file_list.append(file_url)
  364. metadata = {
  365. "conversation_id": message_data.conversation_id,
  366. "ls_provider": message_data.model_provider,
  367. "ls_model_name": message_data.model_id,
  368. "status": message_data.status,
  369. "from_end_user_id": message_data.from_account_id,
  370. "from_account_id": message_data.from_account_id,
  371. "agent_based": message_data.agent_based,
  372. "workflow_run_id": message_data.workflow_run_id,
  373. "from_source": message_data.from_source,
  374. "message_id": message_id,
  375. }
  376. message_tokens = message_data.message_tokens
  377. message_trace_info = MessageTraceInfo(
  378. message_data=message_data,
  379. conversation_model=conversation_mode,
  380. message_tokens=message_tokens,
  381. answer_tokens=message_data.answer_tokens,
  382. total_tokens=message_tokens + message_data.answer_tokens,
  383. error=message_data.error if message_data.error else "",
  384. inputs=inputs,
  385. outputs=message_data.answer,
  386. file_list=file_list,
  387. start_time=created_at,
  388. end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
  389. metadata=metadata,
  390. message_file_data=message_file_data,
  391. conversation_mode=conversation_mode,
  392. )
  393. return message_trace_info
  394. def moderation_trace(self, message_id, timer, **kwargs):
  395. moderation_result = kwargs.get("moderation_result")
  396. inputs = kwargs.get("inputs")
  397. message_data = get_message_data(message_id)
  398. if not message_data:
  399. return {}
  400. metadata = {
  401. "message_id": message_id,
  402. "action": moderation_result.action,
  403. "preset_response": moderation_result.preset_response,
  404. "query": moderation_result.query,
  405. }
  406. # get workflow_app_log_id
  407. workflow_app_log_id = None
  408. if message_data.workflow_run_id:
  409. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
  410. workflow_run_id=message_data.workflow_run_id
  411. ).first()
  412. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  413. moderation_trace_info = ModerationTraceInfo(
  414. message_id=workflow_app_log_id if workflow_app_log_id else message_id,
  415. inputs=inputs,
  416. message_data=message_data,
  417. flagged=moderation_result.flagged,
  418. action=moderation_result.action,
  419. preset_response=moderation_result.preset_response,
  420. query=moderation_result.query,
  421. start_time=timer.get("start"),
  422. end_time=timer.get("end"),
  423. metadata=metadata,
  424. )
  425. return moderation_trace_info
  426. def suggested_question_trace(self, message_id, timer, **kwargs):
  427. suggested_question = kwargs.get("suggested_question")
  428. message_data = get_message_data(message_id)
  429. if not message_data:
  430. return {}
  431. metadata = {
  432. "message_id": message_id,
  433. "ls_provider": message_data.model_provider,
  434. "ls_model_name": message_data.model_id,
  435. "status": message_data.status,
  436. "from_end_user_id": message_data.from_account_id,
  437. "from_account_id": message_data.from_account_id,
  438. "agent_based": message_data.agent_based,
  439. "workflow_run_id": message_data.workflow_run_id,
  440. "from_source": message_data.from_source,
  441. }
  442. # get workflow_app_log_id
  443. workflow_app_log_id = None
  444. if message_data.workflow_run_id:
  445. workflow_app_log_data = db.session.query(WorkflowAppLog).filter_by(
  446. workflow_run_id=message_data.workflow_run_id
  447. ).first()
  448. workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
  449. suggested_question_trace_info = SuggestedQuestionTraceInfo(
  450. message_id=workflow_app_log_id if workflow_app_log_id else message_id,
  451. message_data=message_data,
  452. inputs=message_data.message,
  453. outputs=message_data.answer,
  454. start_time=timer.get("start"),
  455. end_time=timer.get("end"),
  456. metadata=metadata,
  457. total_tokens=message_data.message_tokens + message_data.answer_tokens,
  458. status=message_data.status,
  459. error=message_data.error,
  460. from_account_id=message_data.from_account_id,
  461. agent_based=message_data.agent_based,
  462. from_source=message_data.from_source,
  463. model_provider=message_data.model_provider,
  464. model_id=message_data.model_id,
  465. suggested_question=suggested_question,
  466. level=message_data.status,
  467. status_message=message_data.error,
  468. )
  469. return suggested_question_trace_info
  470. def dataset_retrieval_trace(self, message_id, timer, **kwargs):
  471. documents = kwargs.get("documents")
  472. message_data = get_message_data(message_id)
  473. if not message_data:
  474. return {}
  475. metadata = {
  476. "message_id": message_id,
  477. "ls_provider": message_data.model_provider,
  478. "ls_model_name": message_data.model_id,
  479. "status": message_data.status,
  480. "from_end_user_id": message_data.from_account_id,
  481. "from_account_id": message_data.from_account_id,
  482. "agent_based": message_data.agent_based,
  483. "workflow_run_id": message_data.workflow_run_id,
  484. "from_source": message_data.from_source,
  485. }
  486. dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
  487. message_id=message_id,
  488. inputs=message_data.query if message_data.query else message_data.inputs,
  489. documents=documents,
  490. start_time=timer.get("start"),
  491. end_time=timer.get("end"),
  492. metadata=metadata,
  493. message_data=message_data,
  494. )
  495. return dataset_retrieval_trace_info
  496. def tool_trace(self, message_id, timer, **kwargs):
  497. tool_name = kwargs.get('tool_name')
  498. tool_inputs = kwargs.get('tool_inputs')
  499. tool_outputs = kwargs.get('tool_outputs')
  500. message_data = get_message_data(message_id)
  501. if not message_data:
  502. return {}
  503. tool_config = {}
  504. time_cost = 0
  505. error = None
  506. tool_parameters = {}
  507. created_time = message_data.created_at
  508. end_time = message_data.updated_at
  509. agent_thoughts: list[MessageAgentThought] = message_data.agent_thoughts
  510. for agent_thought in agent_thoughts:
  511. if tool_name in agent_thought.tools:
  512. created_time = agent_thought.created_at
  513. tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
  514. tool_config = tool_meta_data.get('tool_config', {})
  515. time_cost = tool_meta_data.get('time_cost', 0)
  516. end_time = created_time + timedelta(seconds=time_cost)
  517. error = tool_meta_data.get('error', "")
  518. tool_parameters = tool_meta_data.get('tool_parameters', {})
  519. metadata = {
  520. "message_id": message_id,
  521. "tool_name": tool_name,
  522. "tool_inputs": tool_inputs,
  523. "tool_outputs": tool_outputs,
  524. "tool_config": tool_config,
  525. "time_cost": time_cost,
  526. "error": error,
  527. "tool_parameters": tool_parameters,
  528. }
  529. file_url = ""
  530. message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
  531. if message_file_data:
  532. message_file_id = message_file_data.id if message_file_data else None
  533. type = message_file_data.type
  534. created_by_role = message_file_data.created_by_role
  535. created_user_id = message_file_data.created_by
  536. file_url = f"{self.file_base_url}/{message_file_data.url}"
  537. metadata.update(
  538. {
  539. "message_file_id": message_file_id,
  540. "created_by_role": created_by_role,
  541. "created_user_id": created_user_id,
  542. "type": type,
  543. }
  544. )
  545. tool_trace_info = ToolTraceInfo(
  546. message_id=message_id,
  547. message_data=message_data,
  548. tool_name=tool_name,
  549. start_time=timer.get("start") if timer else created_time,
  550. end_time=timer.get("end") if timer else end_time,
  551. tool_inputs=tool_inputs,
  552. tool_outputs=tool_outputs,
  553. metadata=metadata,
  554. message_file_data=message_file_data,
  555. error=error,
  556. inputs=message_data.message,
  557. outputs=message_data.answer,
  558. tool_config=tool_config,
  559. time_cost=time_cost,
  560. tool_parameters=tool_parameters,
  561. file_url=file_url,
  562. )
  563. return tool_trace_info
  564. def generate_name_trace(self, conversation_id, timer, **kwargs):
  565. generate_conversation_name = kwargs.get("generate_conversation_name")
  566. inputs = kwargs.get("inputs")
  567. tenant_id = kwargs.get("tenant_id")
  568. start_time = timer.get("start")
  569. end_time = timer.get("end")
  570. metadata = {
  571. "conversation_id": conversation_id,
  572. "tenant_id": tenant_id,
  573. }
  574. generate_name_trace_info = GenerateNameTraceInfo(
  575. conversation_id=conversation_id,
  576. inputs=inputs,
  577. outputs=generate_conversation_name,
  578. start_time=start_time,
  579. end_time=end_time,
  580. metadata=metadata,
  581. tenant_id=tenant_id,
  582. )
  583. return generate_name_trace_info
  584. class TraceQueueManager:
  585. def __init__(self, app_id=None, conversation_id=None, message_id=None):
  586. tracing_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
  587. self.queue = queue.Queue()
  588. self.is_running = True
  589. self.thread = threading.Thread(
  590. target=self.process_queue, kwargs={
  591. 'flask_app': current_app._get_current_object(),
  592. 'trace_instance': tracing_instance
  593. }
  594. )
  595. self.thread.start()
  596. def stop(self):
  597. self.is_running = False
  598. def process_queue(self, flask_app: Flask, trace_instance: BaseTraceInstance):
  599. with flask_app.app_context():
  600. while self.is_running:
  601. try:
  602. task = self.queue.get(timeout=60)
  603. task.execute(trace_instance)
  604. self.queue.task_done()
  605. except queue.Empty:
  606. self.stop()
  607. def add_trace_task(self, trace_task: TraceTask):
  608. self.queue.put(trace_task)