application_queue_manager.py 8.7 KB


  1. import queue
  2. import time
  3. from collections.abc import Generator
  4. from enum import Enum
  5. from typing import Any
  6. from sqlalchemy.orm import DeclarativeMeta
  7. from core.entities.application_entities import InvokeFrom
  8. from core.entities.queue_entities import (
  9. AnnotationReplyEvent,
  10. AppQueueEvent,
  11. QueueAgentMessageEvent,
  12. QueueAgentThoughtEvent,
  13. QueueErrorEvent,
  14. QueueMessage,
  15. QueueMessageEndEvent,
  16. QueueMessageEvent,
  17. QueueMessageFileEvent,
  18. QueueMessageReplaceEvent,
  19. QueuePingEvent,
  20. QueueRetrieverResourcesEvent,
  21. QueueStopEvent,
  22. )
  23. from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
  24. from extensions.ext_redis import redis_client
  25. from models.model import MessageAgentThought, MessageFile
  26. class PublishFrom(Enum):
  27. APPLICATION_MANAGER = 1
  28. TASK_PIPELINE = 2
  29. class ApplicationQueueManager:
  30. def __init__(self, task_id: str,
  31. user_id: str,
  32. invoke_from: InvokeFrom,
  33. conversation_id: str,
  34. app_mode: str,
  35. message_id: str) -> None:
  36. if not user_id:
  37. raise ValueError("user is required")
  38. self._task_id = task_id
  39. self._user_id = user_id
  40. self._invoke_from = invoke_from
  41. self._conversation_id = str(conversation_id)
  42. self._app_mode = app_mode
  43. self._message_id = str(message_id)
  44. user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
  45. redis_client.setex(ApplicationQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}")
  46. q = queue.Queue()
  47. self._q = q
  48. def listen(self) -> Generator:
  49. """
  50. Listen to queue
  51. :return:
  52. """
  53. # wait for 10 minutes to stop listen
  54. listen_timeout = 600
  55. start_time = time.time()
  56. last_ping_time = 0
  57. while True:
  58. try:
  59. message = self._q.get(timeout=1)
  60. if message is None:
  61. break
  62. yield message
  63. except queue.Empty:
  64. continue
  65. finally:
  66. elapsed_time = time.time() - start_time
  67. if elapsed_time >= listen_timeout or self._is_stopped():
  68. # publish two messages to make sure the client can receive the stop signal
  69. # and stop listening after the stop signal processed
  70. self.publish(
  71. QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL),
  72. PublishFrom.TASK_PIPELINE
  73. )
  74. self.stop_listen()
  75. if elapsed_time // 10 > last_ping_time:
  76. self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
  77. last_ping_time = elapsed_time // 10
  78. def stop_listen(self) -> None:
  79. """
  80. Stop listen to queue
  81. :return:
  82. """
  83. self._q.put(None)
  84. def publish_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
  85. """
  86. Publish chunk message to channel
  87. :param chunk: chunk
  88. :param pub_from: publish from
  89. :return:
  90. """
  91. self.publish(QueueMessageEvent(
  92. chunk=chunk
  93. ), pub_from)
  94. def publish_agent_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
  95. """
  96. Publish agent chunk message to channel
  97. :param chunk: chunk
  98. :param pub_from: publish from
  99. :return:
  100. """
  101. self.publish(QueueAgentMessageEvent(
  102. chunk=chunk
  103. ), pub_from)
  104. def publish_message_replace(self, text: str, pub_from: PublishFrom) -> None:
  105. """
  106. Publish message replace
  107. :param text: text
  108. :param pub_from: publish from
  109. :return:
  110. """
  111. self.publish(QueueMessageReplaceEvent(
  112. text=text
  113. ), pub_from)
  114. def publish_retriever_resources(self, retriever_resources: list[dict], pub_from: PublishFrom) -> None:
  115. """
  116. Publish retriever resources
  117. :return:
  118. """
  119. self.publish(QueueRetrieverResourcesEvent(retriever_resources=retriever_resources), pub_from)
  120. def publish_annotation_reply(self, message_annotation_id: str, pub_from: PublishFrom) -> None:
  121. """
  122. Publish annotation reply
  123. :param message_annotation_id: message annotation id
  124. :param pub_from: publish from
  125. :return:
  126. """
  127. self.publish(AnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
  128. def publish_message_end(self, llm_result: LLMResult, pub_from: PublishFrom) -> None:
  129. """
  130. Publish message end
  131. :param llm_result: llm result
  132. :param pub_from: publish from
  133. :return:
  134. """
  135. self.publish(QueueMessageEndEvent(llm_result=llm_result), pub_from)
  136. self.stop_listen()
  137. def publish_agent_thought(self, message_agent_thought: MessageAgentThought, pub_from: PublishFrom) -> None:
  138. """
  139. Publish agent thought
  140. :param message_agent_thought: message agent thought
  141. :param pub_from: publish from
  142. :return:
  143. """
  144. self.publish(QueueAgentThoughtEvent(
  145. agent_thought_id=message_agent_thought.id
  146. ), pub_from)
  147. def publish_message_file(self, message_file: MessageFile, pub_from: PublishFrom) -> None:
  148. """
  149. Publish agent thought
  150. :param message_file: message file
  151. :param pub_from: publish from
  152. :return:
  153. """
  154. self.publish(QueueMessageFileEvent(
  155. message_file_id=message_file.id
  156. ), pub_from)
  157. def publish_error(self, e, pub_from: PublishFrom) -> None:
  158. """
  159. Publish error
  160. :param e: error
  161. :param pub_from: publish from
  162. :return:
  163. """
  164. self.publish(QueueErrorEvent(
  165. error=e
  166. ), pub_from)
  167. self.stop_listen()
  168. def publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
  169. """
  170. Publish event to queue
  171. :param event:
  172. :param pub_from:
  173. :return:
  174. """
  175. self._check_for_sqlalchemy_models(event.dict())
  176. message = QueueMessage(
  177. task_id=self._task_id,
  178. message_id=self._message_id,
  179. conversation_id=self._conversation_id,
  180. app_mode=self._app_mode,
  181. event=event
  182. )
  183. self._q.put(message)
  184. if isinstance(event, QueueStopEvent):
  185. self.stop_listen()
  186. if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
  187. raise ConversationTaskStoppedException()
  188. @classmethod
  189. def set_stop_flag(cls, task_id: str, invoke_from: InvokeFrom, user_id: str) -> None:
  190. """
  191. Set task stop flag
  192. :return:
  193. """
  194. result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
  195. if result is None:
  196. return
  197. user_prefix = 'account' if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
  198. if result.decode('utf-8') != f"{user_prefix}-{user_id}":
  199. return
  200. stopped_cache_key = cls._generate_stopped_cache_key(task_id)
  201. redis_client.setex(stopped_cache_key, 600, 1)
  202. def _is_stopped(self) -> bool:
  203. """
  204. Check if task is stopped
  205. :return:
  206. """
  207. stopped_cache_key = ApplicationQueueManager._generate_stopped_cache_key(self._task_id)
  208. result = redis_client.get(stopped_cache_key)
  209. if result is not None:
  210. return True
  211. return False
  212. @classmethod
  213. def _generate_task_belong_cache_key(cls, task_id: str) -> str:
  214. """
  215. Generate task belong cache key
  216. :param task_id: task id
  217. :return:
  218. """
  219. return f"generate_task_belong:{task_id}"
  220. @classmethod
  221. def _generate_stopped_cache_key(cls, task_id: str) -> str:
  222. """
  223. Generate stopped cache key
  224. :param task_id: task id
  225. :return:
  226. """
  227. return f"generate_task_stopped:{task_id}"
  228. def _check_for_sqlalchemy_models(self, data: Any):
  229. # from entity to dict or list
  230. if isinstance(data, dict):
  231. for key, value in data.items():
  232. self._check_for_sqlalchemy_models(value)
  233. elif isinstance(data, list):
  234. for item in data:
  235. self._check_for_sqlalchemy_models(item)
  236. else:
  237. if isinstance(data, DeclarativeMeta) or hasattr(data, '_sa_instance_state'):
  238. raise TypeError("Critical Error: Passing SQLAlchemy Model instances "
  239. "that cause thread safety issues is not allowed.")
  240. class ConversationTaskStoppedException(Exception):
  241. pass