conversation_message_task.py 20 KB


  1. import json
  2. import time
  3. from typing import Optional, Union, List
  4. from core.callback_handler.entity.agent_loop import AgentLoop
  5. from core.callback_handler.entity.dataset_query import DatasetQueryObj
  6. from core.callback_handler.entity.llm_message import LLMMessage
  7. from core.callback_handler.entity.chain_result import ChainResult
  8. from core.file.file_obj import FileObj
  9. from core.model_providers.model_factory import ModelFactory
  10. from core.model_providers.models.entity.message import to_prompt_messages, MessageType, PromptMessageFile
  11. from core.model_providers.models.llm.base import BaseLLM
  12. from core.prompt.prompt_builder import PromptBuilder
  13. from core.prompt.prompt_template import PromptTemplateParser
  14. from events.message_event import message_was_created
  15. from extensions.ext_database import db
  16. from extensions.ext_redis import redis_client
  17. from models.dataset import DatasetQuery
  18. from models.model import AppModelConfig, Conversation, Account, Message, EndUser, App, MessageAgentThought, \
  19. MessageChain, DatasetRetrieverResource, MessageFile
  20. class ConversationMessageTask:
  21. def __init__(self, task_id: str, app: App, app_model_config: AppModelConfig, user: Account,
  22. inputs: dict, query: str, files: List[FileObj], streaming: bool,
  23. model_instance: BaseLLM, conversation: Optional[Conversation] = None, is_override: bool = False,
  24. auto_generate_name: bool = True):
  25. self.start_at = time.perf_counter()
  26. self.task_id = task_id
  27. self.app = app
  28. self.tenant_id = app.tenant_id
  29. self.app_model_config = app_model_config
  30. self.is_override = is_override
  31. self.user = user
  32. self.inputs = inputs
  33. self.query = query
  34. self.files = files
  35. self.streaming = streaming
  36. self.conversation = conversation
  37. self.is_new_conversation = False
  38. self.model_instance = model_instance
  39. self.message = None
  40. self.retriever_resource = None
  41. self.auto_generate_name = auto_generate_name
  42. self.model_dict = self.app_model_config.model_dict
  43. self.provider_name = self.model_dict.get('provider')
  44. self.model_name = self.model_dict.get('name')
  45. self.mode = app.mode
  46. self.init()
  47. self._pub_handler = PubHandler(
  48. user=self.user,
  49. task_id=self.task_id,
  50. message=self.message,
  51. conversation=self.conversation,
  52. chain_pub=False, # disabled currently
  53. agent_thought_pub=True
  54. )
  55. def init(self):
  56. override_model_configs = None
  57. if self.is_override:
  58. override_model_configs = self.app_model_config.to_dict()
  59. introduction = ''
  60. system_instruction = ''
  61. system_instruction_tokens = 0
  62. if self.mode == 'chat':
  63. introduction = self.app_model_config.opening_statement
  64. if introduction:
  65. prompt_template = PromptTemplateParser(template=introduction)
  66. prompt_inputs = {k: self.inputs[k] for k in prompt_template.variable_keys if k in self.inputs}
  67. try:
  68. introduction = prompt_template.format(prompt_inputs)
  69. except KeyError:
  70. pass
  71. if self.app_model_config.pre_prompt:
  72. system_message = PromptBuilder.to_system_message(self.app_model_config.pre_prompt, self.inputs)
  73. system_instruction = system_message.content
  74. model_instance = ModelFactory.get_text_generation_model(
  75. tenant_id=self.tenant_id,
  76. model_provider_name=self.provider_name,
  77. model_name=self.model_name
  78. )
  79. system_instruction_tokens = model_instance.get_num_tokens(to_prompt_messages([system_message]))
  80. if not self.conversation:
  81. self.is_new_conversation = True
  82. self.conversation = Conversation(
  83. app_id=self.app.id,
  84. app_model_config_id=self.app_model_config.id,
  85. model_provider=self.provider_name,
  86. model_id=self.model_name,
  87. override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
  88. mode=self.mode,
  89. name='New conversation',
  90. inputs=self.inputs,
  91. introduction=introduction,
  92. system_instruction=system_instruction,
  93. system_instruction_tokens=system_instruction_tokens,
  94. status='normal',
  95. from_source=('console' if isinstance(self.user, Account) else 'api'),
  96. from_end_user_id=(self.user.id if isinstance(self.user, EndUser) else None),
  97. from_account_id=(self.user.id if isinstance(self.user, Account) else None),
  98. )
  99. db.session.add(self.conversation)
  100. db.session.commit()
  101. self.message = Message(
  102. app_id=self.app.id,
  103. model_provider=self.provider_name,
  104. model_id=self.model_name,
  105. override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
  106. conversation_id=self.conversation.id,
  107. inputs=self.inputs,
  108. query=self.query,
  109. message="",
  110. message_tokens=0,
  111. message_unit_price=0,
  112. message_price_unit=0,
  113. answer="",
  114. answer_tokens=0,
  115. answer_unit_price=0,
  116. answer_price_unit=0,
  117. provider_response_latency=0,
  118. total_price=0,
  119. currency=self.model_instance.get_currency(),
  120. from_source=('console' if isinstance(self.user, Account) else 'api'),
  121. from_end_user_id=(self.user.id if isinstance(self.user, EndUser) else None),
  122. from_account_id=(self.user.id if isinstance(self.user, Account) else None),
  123. agent_based=self.app_model_config.agent_mode_dict.get('enabled'),
  124. )
  125. db.session.add(self.message)
  126. db.session.commit()
  127. for file in self.files:
  128. message_file = MessageFile(
  129. message_id=self.message.id,
  130. type=file.type.value,
  131. transfer_method=file.transfer_method.value,
  132. url=file.url,
  133. upload_file_id=file.upload_file_id,
  134. created_by_role=('account' if isinstance(self.user, Account) else 'end_user'),
  135. created_by=self.user.id
  136. )
  137. db.session.add(message_file)
  138. db.session.commit()
  139. def append_message_text(self, text: str):
  140. if text is not None:
  141. self._pub_handler.pub_text(text)
  142. def save_message(self, llm_message: LLMMessage, by_stopped: bool = False):
  143. message_tokens = llm_message.prompt_tokens
  144. answer_tokens = llm_message.completion_tokens
  145. message_unit_price = self.model_instance.get_tokens_unit_price(MessageType.USER)
  146. message_price_unit = self.model_instance.get_price_unit(MessageType.USER)
  147. answer_unit_price = self.model_instance.get_tokens_unit_price(MessageType.ASSISTANT)
  148. answer_price_unit = self.model_instance.get_price_unit(MessageType.ASSISTANT)
  149. message_total_price = self.model_instance.calc_tokens_price(message_tokens, MessageType.USER)
  150. answer_total_price = self.model_instance.calc_tokens_price(answer_tokens, MessageType.ASSISTANT)
  151. total_price = message_total_price + answer_total_price
  152. self.message.message = llm_message.prompt
  153. self.message.message_tokens = message_tokens
  154. self.message.message_unit_price = message_unit_price
  155. self.message.message_price_unit = message_price_unit
  156. self.message.answer = PromptTemplateParser.remove_template_variables(
  157. llm_message.completion.strip()) if llm_message.completion else ''
  158. self.message.answer_tokens = answer_tokens
  159. self.message.answer_unit_price = answer_unit_price
  160. self.message.answer_price_unit = answer_price_unit
  161. self.message.provider_response_latency = time.perf_counter() - self.start_at
  162. self.message.total_price = total_price
  163. db.session.commit()
  164. message_was_created.send(
  165. self.message,
  166. conversation=self.conversation,
  167. is_first_message=self.is_new_conversation,
  168. auto_generate_name=self.auto_generate_name
  169. )
  170. if not by_stopped:
  171. self.end()
  172. def init_chain(self, chain_result: ChainResult):
  173. message_chain = MessageChain(
  174. message_id=self.message.id,
  175. type=chain_result.type,
  176. input=json.dumps(chain_result.prompt),
  177. output=''
  178. )
  179. db.session.add(message_chain)
  180. db.session.commit()
  181. return message_chain
  182. def on_chain_end(self, message_chain: MessageChain, chain_result: ChainResult):
  183. message_chain.output = json.dumps(chain_result.completion)
  184. db.session.commit()
  185. self._pub_handler.pub_chain(message_chain)
  186. def on_agent_start(self, message_chain: MessageChain, agent_loop: AgentLoop) -> MessageAgentThought:
  187. message_agent_thought = MessageAgentThought(
  188. message_id=self.message.id,
  189. message_chain_id=message_chain.id,
  190. position=agent_loop.position,
  191. thought=agent_loop.thought,
  192. tool=agent_loop.tool_name,
  193. tool_input=agent_loop.tool_input,
  194. message=agent_loop.prompt,
  195. message_price_unit=0,
  196. answer=agent_loop.completion,
  197. answer_price_unit=0,
  198. created_by_role=('account' if isinstance(self.user, Account) else 'end_user'),
  199. created_by=self.user.id
  200. )
  201. db.session.add(message_agent_thought)
  202. db.session.commit()
  203. self._pub_handler.pub_agent_thought(message_agent_thought)
  204. return message_agent_thought
  205. def on_agent_end(self, message_agent_thought: MessageAgentThought, agent_model_instance: BaseLLM,
  206. agent_loop: AgentLoop):
  207. agent_message_unit_price = agent_model_instance.get_tokens_unit_price(MessageType.USER)
  208. agent_message_price_unit = agent_model_instance.get_price_unit(MessageType.USER)
  209. agent_answer_unit_price = agent_model_instance.get_tokens_unit_price(MessageType.ASSISTANT)
  210. agent_answer_price_unit = agent_model_instance.get_price_unit(MessageType.ASSISTANT)
  211. loop_message_tokens = agent_loop.prompt_tokens
  212. loop_answer_tokens = agent_loop.completion_tokens
  213. loop_message_total_price = agent_model_instance.calc_tokens_price(loop_message_tokens, MessageType.USER)
  214. loop_answer_total_price = agent_model_instance.calc_tokens_price(loop_answer_tokens, MessageType.ASSISTANT)
  215. loop_total_price = loop_message_total_price + loop_answer_total_price
  216. message_agent_thought.observation = agent_loop.tool_output
  217. message_agent_thought.tool_process_data = '' # currently not support
  218. message_agent_thought.message_token = loop_message_tokens
  219. message_agent_thought.message_unit_price = agent_message_unit_price
  220. message_agent_thought.message_price_unit = agent_message_price_unit
  221. message_agent_thought.answer_token = loop_answer_tokens
  222. message_agent_thought.answer_unit_price = agent_answer_unit_price
  223. message_agent_thought.answer_price_unit = agent_answer_price_unit
  224. message_agent_thought.latency = agent_loop.latency
  225. message_agent_thought.tokens = agent_loop.prompt_tokens + agent_loop.completion_tokens
  226. message_agent_thought.total_price = loop_total_price
  227. message_agent_thought.currency = agent_model_instance.get_currency()
  228. db.session.commit()
  229. def on_dataset_query_end(self, dataset_query_obj: DatasetQueryObj):
  230. dataset_query = DatasetQuery(
  231. dataset_id=dataset_query_obj.dataset_id,
  232. content=dataset_query_obj.query,
  233. source='app',
  234. source_app_id=self.app.id,
  235. created_by_role=('account' if isinstance(self.user, Account) else 'end_user'),
  236. created_by=self.user.id
  237. )
  238. db.session.add(dataset_query)
  239. db.session.commit()
  240. def on_dataset_query_finish(self, resource: List):
  241. if resource and len(resource) > 0:
  242. for item in resource:
  243. dataset_retriever_resource = DatasetRetrieverResource(
  244. message_id=self.message.id,
  245. position=item.get('position'),
  246. dataset_id=item.get('dataset_id'),
  247. dataset_name=item.get('dataset_name'),
  248. document_id=item.get('document_id'),
  249. document_name=item.get('document_name'),
  250. data_source_type=item.get('data_source_type'),
  251. segment_id=item.get('segment_id'),
  252. score=item.get('score') if 'score' in item else None,
  253. hit_count=item.get('hit_count') if 'hit_count' else None,
  254. word_count=item.get('word_count') if 'word_count' in item else None,
  255. segment_position=item.get('segment_position') if 'segment_position' in item else None,
  256. index_node_hash=item.get('index_node_hash') if 'index_node_hash' in item else None,
  257. content=item.get('content'),
  258. retriever_from=item.get('retriever_from'),
  259. created_by=self.user.id
  260. )
  261. db.session.add(dataset_retriever_resource)
  262. db.session.commit()
  263. self.retriever_resource = resource
  264. def on_message_replace(self, text: str):
  265. if text is not None:
  266. self._pub_handler.pub_message_replace(text)
  267. def message_end(self):
  268. self._pub_handler.pub_message_end(self.retriever_resource)
  269. def end(self):
  270. self._pub_handler.pub_message_end(self.retriever_resource)
  271. self._pub_handler.pub_end()
  272. class PubHandler:
  273. def __init__(self, user: Union[Account, EndUser], task_id: str,
  274. message: Message, conversation: Conversation,
  275. chain_pub: bool = False, agent_thought_pub: bool = False):
  276. self._channel = PubHandler.generate_channel_name(user, task_id)
  277. self._stopped_cache_key = PubHandler.generate_stopped_cache_key(user, task_id)
  278. self._task_id = task_id
  279. self._message = message
  280. self._conversation = conversation
  281. self._chain_pub = chain_pub
  282. self._agent_thought_pub = agent_thought_pub
  283. @classmethod
  284. def generate_channel_name(cls, user: Union[Account, EndUser], task_id: str):
  285. if not user:
  286. raise ValueError("user is required")
  287. user_str = 'account-' + str(user.id) if isinstance(user, Account) else 'end-user-' + str(user.id)
  288. return "generate_result:{}-{}".format(user_str, task_id)
  289. @classmethod
  290. def generate_stopped_cache_key(cls, user: Union[Account, EndUser], task_id: str):
  291. user_str = 'account-' + str(user.id) if isinstance(user, Account) else 'end-user-' + str(user.id)
  292. return "generate_result_stopped:{}-{}".format(user_str, task_id)
  293. def pub_text(self, text: str):
  294. content = {
  295. 'event': 'message',
  296. 'data': {
  297. 'task_id': self._task_id,
  298. 'message_id': str(self._message.id),
  299. 'text': text,
  300. 'mode': self._conversation.mode,
  301. 'conversation_id': str(self._conversation.id)
  302. }
  303. }
  304. redis_client.publish(self._channel, json.dumps(content))
  305. if self._is_stopped():
  306. self.pub_end()
  307. raise ConversationTaskStoppedException()
  308. def pub_message_replace(self, text: str):
  309. content = {
  310. 'event': 'message_replace',
  311. 'data': {
  312. 'task_id': self._task_id,
  313. 'message_id': str(self._message.id),
  314. 'text': text,
  315. 'mode': self._conversation.mode,
  316. 'conversation_id': str(self._conversation.id)
  317. }
  318. }
  319. redis_client.publish(self._channel, json.dumps(content))
  320. if self._is_stopped():
  321. self.pub_end()
  322. raise ConversationTaskStoppedException()
  323. def pub_chain(self, message_chain: MessageChain):
  324. if self._chain_pub:
  325. content = {
  326. 'event': 'chain',
  327. 'data': {
  328. 'task_id': self._task_id,
  329. 'message_id': self._message.id,
  330. 'chain_id': message_chain.id,
  331. 'type': message_chain.type,
  332. 'input': json.loads(message_chain.input),
  333. 'output': json.loads(message_chain.output),
  334. 'mode': self._conversation.mode,
  335. 'conversation_id': self._conversation.id
  336. }
  337. }
  338. redis_client.publish(self._channel, json.dumps(content))
  339. if self._is_stopped():
  340. self.pub_end()
  341. raise ConversationTaskStoppedException()
  342. def pub_agent_thought(self, message_agent_thought: MessageAgentThought):
  343. if self._agent_thought_pub:
  344. content = {
  345. 'event': 'agent_thought',
  346. 'data': {
  347. 'id': message_agent_thought.id,
  348. 'task_id': self._task_id,
  349. 'message_id': self._message.id,
  350. 'chain_id': message_agent_thought.message_chain_id,
  351. 'position': message_agent_thought.position,
  352. 'thought': message_agent_thought.thought,
  353. 'tool': message_agent_thought.tool,
  354. 'tool_input': message_agent_thought.tool_input,
  355. 'mode': self._conversation.mode,
  356. 'conversation_id': self._conversation.id
  357. }
  358. }
  359. redis_client.publish(self._channel, json.dumps(content))
  360. if self._is_stopped():
  361. self.pub_end()
  362. raise ConversationTaskStoppedException()
  363. def pub_message_end(self, retriever_resource: List):
  364. content = {
  365. 'event': 'message_end',
  366. 'data': {
  367. 'task_id': self._task_id,
  368. 'message_id': self._message.id,
  369. 'mode': self._conversation.mode,
  370. 'conversation_id': self._conversation.id
  371. }
  372. }
  373. if retriever_resource:
  374. content['data']['retriever_resources'] = retriever_resource
  375. redis_client.publish(self._channel, json.dumps(content))
  376. if self._is_stopped():
  377. self.pub_end()
  378. raise ConversationTaskStoppedException()
  379. def pub_end(self):
  380. content = {
  381. 'event': 'end',
  382. }
  383. redis_client.publish(self._channel, json.dumps(content))
  384. @classmethod
  385. def pub_error(cls, user: Union[Account, EndUser], task_id: str, e):
  386. content = {
  387. 'error': type(e).__name__,
  388. 'description': e.description if getattr(e, 'description', None) is not None else str(e)
  389. }
  390. channel = cls.generate_channel_name(user, task_id)
  391. redis_client.publish(channel, json.dumps(content))
  392. def _is_stopped(self):
  393. return redis_client.get(self._stopped_cache_key) is not None
  394. @classmethod
  395. def ping(cls, user: Union[Account, EndUser], task_id: str):
  396. content = {
  397. 'event': 'ping'
  398. }
  399. channel = cls.generate_channel_name(user, task_id)
  400. redis_client.publish(channel, json.dumps(content))
  401. @classmethod
  402. def stop(cls, user: Union[Account, EndUser], task_id: str):
  403. stopped_cache_key = cls.generate_stopped_cache_key(user, task_id)
  404. redis_client.setex(stopped_cache_key, 600, 1)
  405. class ConversationTaskStoppedException(Exception):
  406. pass
  407. class ConversationTaskInterruptException(Exception):
  408. pass