conversation_message_task.py 16 KB


  1. import decimal
  2. import json
  3. from typing import Optional, Union
  4. from core.callback_handler.entity.agent_loop import AgentLoop
  5. from core.callback_handler.entity.dataset_query import DatasetQueryObj
  6. from core.callback_handler.entity.llm_message import LLMMessage
  7. from core.callback_handler.entity.chain_result import ChainResult
  8. from core.model_providers.model_factory import ModelFactory
  9. from core.model_providers.models.entity.message import to_prompt_messages, MessageType
  10. from core.model_providers.models.llm.base import BaseLLM
  11. from core.prompt.prompt_builder import PromptBuilder
  12. from core.prompt.prompt_template import JinjaPromptTemplate
  13. from events.message_event import message_was_created
  14. from extensions.ext_database import db
  15. from extensions.ext_redis import redis_client
  16. from models.dataset import DatasetQuery
  17. from models.model import AppModelConfig, Conversation, Account, Message, EndUser, App, MessageAgentThought, MessageChain
  18. class ConversationMessageTask:
  19. def __init__(self, task_id: str, app: App, app_model_config: AppModelConfig, user: Account,
  20. inputs: dict, query: str, streaming: bool, model_instance: BaseLLM,
  21. conversation: Optional[Conversation] = None, is_override: bool = False):
  22. self.task_id = task_id
  23. self.app = app
  24. self.tenant_id = app.tenant_id
  25. self.app_model_config = app_model_config
  26. self.is_override = is_override
  27. self.user = user
  28. self.inputs = inputs
  29. self.query = query
  30. self.streaming = streaming
  31. self.conversation = conversation
  32. self.is_new_conversation = False
  33. self.model_instance = model_instance
  34. self.message = None
  35. self.model_dict = self.app_model_config.model_dict
  36. self.provider_name = self.model_dict.get('provider')
  37. self.model_name = self.model_dict.get('name')
  38. self.mode = app.mode
  39. self.init()
  40. self._pub_handler = PubHandler(
  41. user=self.user,
  42. task_id=self.task_id,
  43. message=self.message,
  44. conversation=self.conversation,
  45. chain_pub=False, # disabled currently
  46. agent_thought_pub=True
  47. )
  48. def init(self):
  49. override_model_configs = None
  50. if self.is_override:
  51. override_model_configs = self.app_model_config.to_dict()
  52. introduction = ''
  53. system_instruction = ''
  54. system_instruction_tokens = 0
  55. if self.mode == 'chat':
  56. introduction = self.app_model_config.opening_statement
  57. if introduction:
  58. prompt_template = JinjaPromptTemplate.from_template(template=introduction)
  59. prompt_inputs = {k: self.inputs[k] for k in prompt_template.input_variables if k in self.inputs}
  60. try:
  61. introduction = prompt_template.format(**prompt_inputs)
  62. except KeyError:
  63. pass
  64. if self.app_model_config.pre_prompt:
  65. system_message = PromptBuilder.to_system_message(self.app_model_config.pre_prompt, self.inputs)
  66. system_instruction = system_message.content
  67. model_instance = ModelFactory.get_text_generation_model(
  68. tenant_id=self.tenant_id,
  69. model_provider_name=self.provider_name,
  70. model_name=self.model_name
  71. )
  72. system_instruction_tokens = model_instance.get_num_tokens(to_prompt_messages([system_message]))
  73. if not self.conversation:
  74. self.is_new_conversation = True
  75. self.conversation = Conversation(
  76. app_id=self.app_model_config.app_id,
  77. app_model_config_id=self.app_model_config.id,
  78. model_provider=self.provider_name,
  79. model_id=self.model_name,
  80. override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
  81. mode=self.mode,
  82. name='',
  83. inputs=self.inputs,
  84. introduction=introduction,
  85. system_instruction=system_instruction,
  86. system_instruction_tokens=system_instruction_tokens,
  87. status='normal',
  88. from_source=('console' if isinstance(self.user, Account) else 'api'),
  89. from_end_user_id=(self.user.id if isinstance(self.user, EndUser) else None),
  90. from_account_id=(self.user.id if isinstance(self.user, Account) else None),
  91. )
  92. db.session.add(self.conversation)
  93. db.session.flush()
  94. self.message = Message(
  95. app_id=self.app_model_config.app_id,
  96. model_provider=self.provider_name,
  97. model_id=self.model_name,
  98. override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
  99. conversation_id=self.conversation.id,
  100. inputs=self.inputs,
  101. query=self.query,
  102. message="",
  103. message_tokens=0,
  104. message_unit_price=0,
  105. message_price_unit=0,
  106. answer="",
  107. answer_tokens=0,
  108. answer_unit_price=0,
  109. answer_price_unit=0,
  110. provider_response_latency=0,
  111. total_price=0,
  112. currency=self.model_instance.get_currency(),
  113. from_source=('console' if isinstance(self.user, Account) else 'api'),
  114. from_end_user_id=(self.user.id if isinstance(self.user, EndUser) else None),
  115. from_account_id=(self.user.id if isinstance(self.user, Account) else None),
  116. agent_based=self.app_model_config.agent_mode_dict.get('enabled'),
  117. )
  118. db.session.add(self.message)
  119. db.session.flush()
  120. def append_message_text(self, text: str):
  121. self._pub_handler.pub_text(text)
  122. def save_message(self, llm_message: LLMMessage, by_stopped: bool = False):
  123. message_tokens = llm_message.prompt_tokens
  124. answer_tokens = llm_message.completion_tokens
  125. message_unit_price = self.model_instance.get_tokens_unit_price(MessageType.HUMAN)
  126. message_price_unit = self.model_instance.get_price_unit(MessageType.HUMAN)
  127. answer_unit_price = self.model_instance.get_tokens_unit_price(MessageType.ASSISTANT)
  128. answer_price_unit = self.model_instance.get_price_unit(MessageType.ASSISTANT)
  129. message_total_price = self.model_instance.calc_tokens_price(message_tokens, MessageType.HUMAN)
  130. answer_total_price = self.model_instance.calc_tokens_price(answer_tokens, MessageType.ASSISTANT)
  131. total_price = message_total_price + answer_total_price
  132. self.message.message = llm_message.prompt
  133. self.message.message_tokens = message_tokens
  134. self.message.message_unit_price = message_unit_price
  135. self.message.message_price_unit = message_price_unit
  136. self.message.answer = PromptBuilder.process_template(llm_message.completion.strip()) if llm_message.completion else ''
  137. self.message.answer_tokens = answer_tokens
  138. self.message.answer_unit_price = answer_unit_price
  139. self.message.answer_price_unit = answer_price_unit
  140. self.message.provider_response_latency = llm_message.latency
  141. self.message.total_price = total_price
  142. db.session.commit()
  143. message_was_created.send(
  144. self.message,
  145. conversation=self.conversation,
  146. is_first_message=self.is_new_conversation
  147. )
  148. if not by_stopped:
  149. self.end()
  150. def init_chain(self, chain_result: ChainResult):
  151. message_chain = MessageChain(
  152. message_id=self.message.id,
  153. type=chain_result.type,
  154. input=json.dumps(chain_result.prompt),
  155. output=''
  156. )
  157. db.session.add(message_chain)
  158. db.session.flush()
  159. return message_chain
  160. def on_chain_end(self, message_chain: MessageChain, chain_result: ChainResult):
  161. message_chain.output = json.dumps(chain_result.completion)
  162. self._pub_handler.pub_chain(message_chain)
  163. def on_agent_start(self, message_chain: MessageChain, agent_loop: AgentLoop) -> MessageAgentThought:
  164. message_agent_thought = MessageAgentThought(
  165. message_id=self.message.id,
  166. message_chain_id=message_chain.id,
  167. position=agent_loop.position,
  168. thought=agent_loop.thought,
  169. tool=agent_loop.tool_name,
  170. tool_input=agent_loop.tool_input,
  171. message=agent_loop.prompt,
  172. message_price_unit=0,
  173. answer=agent_loop.completion,
  174. answer_price_unit=0,
  175. created_by_role=('account' if isinstance(self.user, Account) else 'end_user'),
  176. created_by=self.user.id
  177. )
  178. db.session.add(message_agent_thought)
  179. db.session.flush()
  180. self._pub_handler.pub_agent_thought(message_agent_thought)
  181. return message_agent_thought
  182. def on_agent_end(self, message_agent_thought: MessageAgentThought, agent_model_instant: BaseLLM,
  183. agent_loop: AgentLoop):
  184. agent_message_unit_price = agent_model_instant.get_tokens_unit_price(MessageType.HUMAN)
  185. agent_message_price_unit = agent_model_instant.get_price_unit(MessageType.HUMAN)
  186. agent_answer_unit_price = agent_model_instant.get_tokens_unit_price(MessageType.ASSISTANT)
  187. agent_answer_price_unit = agent_model_instant.get_price_unit(MessageType.ASSISTANT)
  188. loop_message_tokens = agent_loop.prompt_tokens
  189. loop_answer_tokens = agent_loop.completion_tokens
  190. loop_message_total_price = agent_model_instant.calc_tokens_price(loop_message_tokens, MessageType.HUMAN)
  191. loop_answer_total_price = agent_model_instant.calc_tokens_price(loop_answer_tokens, MessageType.ASSISTANT)
  192. loop_total_price = loop_message_total_price + loop_answer_total_price
  193. message_agent_thought.observation = agent_loop.tool_output
  194. message_agent_thought.tool_process_data = '' # currently not support
  195. message_agent_thought.message_token = loop_message_tokens
  196. message_agent_thought.message_unit_price = agent_message_unit_price
  197. message_agent_thought.message_price_unit = agent_message_price_unit
  198. message_agent_thought.answer_token = loop_answer_tokens
  199. message_agent_thought.answer_unit_price = agent_answer_unit_price
  200. message_agent_thought.answer_price_unit = agent_answer_price_unit
  201. message_agent_thought.latency = agent_loop.latency
  202. message_agent_thought.tokens = agent_loop.prompt_tokens + agent_loop.completion_tokens
  203. message_agent_thought.total_price = loop_total_price
  204. message_agent_thought.currency = agent_model_instant.get_currency()
  205. db.session.flush()
  206. def on_dataset_query_end(self, dataset_query_obj: DatasetQueryObj):
  207. dataset_query = DatasetQuery(
  208. dataset_id=dataset_query_obj.dataset_id,
  209. content=dataset_query_obj.query,
  210. source='app',
  211. source_app_id=self.app.id,
  212. created_by_role=('account' if isinstance(self.user, Account) else 'end_user'),
  213. created_by=self.user.id
  214. )
  215. db.session.add(dataset_query)
  216. def end(self):
  217. self._pub_handler.pub_end()
  218. class PubHandler:
  219. def __init__(self, user: Union[Account | EndUser], task_id: str,
  220. message: Message, conversation: Conversation,
  221. chain_pub: bool = False, agent_thought_pub: bool = False):
  222. self._channel = PubHandler.generate_channel_name(user, task_id)
  223. self._stopped_cache_key = PubHandler.generate_stopped_cache_key(user, task_id)
  224. self._task_id = task_id
  225. self._message = message
  226. self._conversation = conversation
  227. self._chain_pub = chain_pub
  228. self._agent_thought_pub = agent_thought_pub
  229. @classmethod
  230. def generate_channel_name(cls, user: Union[Account | EndUser], task_id: str):
  231. if not user:
  232. raise ValueError("user is required")
  233. user_str = 'account-' + str(user.id) if isinstance(user, Account) else 'end-user-' + str(user.id)
  234. return "generate_result:{}-{}".format(user_str, task_id)
  235. @classmethod
  236. def generate_stopped_cache_key(cls, user: Union[Account | EndUser], task_id: str):
  237. user_str = 'account-' + str(user.id) if isinstance(user, Account) else 'end-user-' + str(user.id)
  238. return "generate_result_stopped:{}-{}".format(user_str, task_id)
  239. def pub_text(self, text: str):
  240. content = {
  241. 'event': 'message',
  242. 'data': {
  243. 'task_id': self._task_id,
  244. 'message_id': str(self._message.id),
  245. 'text': text,
  246. 'mode': self._conversation.mode,
  247. 'conversation_id': str(self._conversation.id)
  248. }
  249. }
  250. redis_client.publish(self._channel, json.dumps(content))
  251. if self._is_stopped():
  252. self.pub_end()
  253. raise ConversationTaskStoppedException()
  254. def pub_chain(self, message_chain: MessageChain):
  255. if self._chain_pub:
  256. content = {
  257. 'event': 'chain',
  258. 'data': {
  259. 'task_id': self._task_id,
  260. 'message_id': self._message.id,
  261. 'chain_id': message_chain.id,
  262. 'type': message_chain.type,
  263. 'input': json.loads(message_chain.input),
  264. 'output': json.loads(message_chain.output),
  265. 'mode': self._conversation.mode,
  266. 'conversation_id': self._conversation.id
  267. }
  268. }
  269. redis_client.publish(self._channel, json.dumps(content))
  270. if self._is_stopped():
  271. self.pub_end()
  272. raise ConversationTaskStoppedException()
  273. def pub_agent_thought(self, message_agent_thought: MessageAgentThought):
  274. if self._agent_thought_pub:
  275. content = {
  276. 'event': 'agent_thought',
  277. 'data': {
  278. 'id': message_agent_thought.id,
  279. 'task_id': self._task_id,
  280. 'message_id': self._message.id,
  281. 'chain_id': message_agent_thought.message_chain_id,
  282. 'position': message_agent_thought.position,
  283. 'thought': message_agent_thought.thought,
  284. 'tool': message_agent_thought.tool,
  285. 'tool_input': message_agent_thought.tool_input,
  286. 'mode': self._conversation.mode,
  287. 'conversation_id': self._conversation.id
  288. }
  289. }
  290. redis_client.publish(self._channel, json.dumps(content))
  291. if self._is_stopped():
  292. self.pub_end()
  293. raise ConversationTaskStoppedException()
  294. def pub_end(self):
  295. content = {
  296. 'event': 'end',
  297. }
  298. redis_client.publish(self._channel, json.dumps(content))
  299. @classmethod
  300. def pub_error(cls, user: Union[Account | EndUser], task_id: str, e):
  301. content = {
  302. 'error': type(e).__name__,
  303. 'description': e.description if getattr(e, 'description', None) is not None else str(e)
  304. }
  305. channel = cls.generate_channel_name(user, task_id)
  306. redis_client.publish(channel, json.dumps(content))
  307. def _is_stopped(self):
  308. return redis_client.get(self._stopped_cache_key) is not None
  309. @classmethod
  310. def ping(cls, user: Union[Account | EndUser], task_id: str):
  311. content = {
  312. 'event': 'ping'
  313. }
  314. channel = cls.generate_channel_name(user, task_id)
  315. redis_client.publish(channel, json.dumps(content))
  316. @classmethod
  317. def stop(cls, user: Union[Account | EndUser], task_id: str):
  318. stopped_cache_key = cls.generate_stopped_cache_key(user, task_id)
  319. redis_client.setex(stopped_cache_key, 600, 1)
  320. class ConversationTaskStoppedException(Exception):
  321. pass