application_queue_manager.py 8.7 KB


  1. import queue
  2. import time
  3. from enum import Enum
  4. from typing import Any, Generator
  5. from sqlalchemy.orm import DeclarativeMeta
  6. from core.entities.application_entities import InvokeFrom
  7. from core.entities.queue_entities import (
  8. AnnotationReplyEvent,
  9. AppQueueEvent,
  10. QueueAgentMessageEvent,
  11. QueueAgentThoughtEvent,
  12. QueueErrorEvent,
  13. QueueMessage,
  14. QueueMessageEndEvent,
  15. QueueMessageEvent,
  16. QueueMessageFileEvent,
  17. QueueMessageReplaceEvent,
  18. QueuePingEvent,
  19. QueueRetrieverResourcesEvent,
  20. QueueStopEvent,
  21. )
  22. from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
  23. from extensions.ext_redis import redis_client
  24. from models.model import MessageAgentThought, MessageFile
  25. class PublishFrom(Enum):
  26. APPLICATION_MANAGER = 1
  27. TASK_PIPELINE = 2
  28. class ApplicationQueueManager:
  29. def __init__(self, task_id: str,
  30. user_id: str,
  31. invoke_from: InvokeFrom,
  32. conversation_id: str,
  33. app_mode: str,
  34. message_id: str) -> None:
  35. if not user_id:
  36. raise ValueError("user is required")
  37. self._task_id = task_id
  38. self._user_id = user_id
  39. self._invoke_from = invoke_from
  40. self._conversation_id = str(conversation_id)
  41. self._app_mode = app_mode
  42. self._message_id = str(message_id)
  43. user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
  44. redis_client.setex(ApplicationQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}")
  45. q = queue.Queue()
  46. self._q = q
  47. def listen(self) -> Generator:
  48. """
  49. Listen to queue
  50. :return:
  51. """
  52. # wait for 10 minutes to stop listen
  53. listen_timeout = 600
  54. start_time = time.time()
  55. last_ping_time = 0
  56. while True:
  57. try:
  58. message = self._q.get(timeout=1)
  59. if message is None:
  60. break
  61. yield message
  62. except queue.Empty:
  63. continue
  64. finally:
  65. elapsed_time = time.time() - start_time
  66. if elapsed_time >= listen_timeout or self._is_stopped():
  67. # publish two messages to make sure the client can receive the stop signal
  68. # and stop listening after the stop signal processed
  69. self.publish(
  70. QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL),
  71. PublishFrom.TASK_PIPELINE
  72. )
  73. self.stop_listen()
  74. if elapsed_time // 10 > last_ping_time:
  75. self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
  76. last_ping_time = elapsed_time // 10
  77. def stop_listen(self) -> None:
  78. """
  79. Stop listen to queue
  80. :return:
  81. """
  82. self._q.put(None)
  83. def publish_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
  84. """
  85. Publish chunk message to channel
  86. :param chunk: chunk
  87. :param pub_from: publish from
  88. :return:
  89. """
  90. self.publish(QueueMessageEvent(
  91. chunk=chunk
  92. ), pub_from)
  93. def publish_agent_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
  94. """
  95. Publish agent chunk message to channel
  96. :param chunk: chunk
  97. :param pub_from: publish from
  98. :return:
  99. """
  100. self.publish(QueueAgentMessageEvent(
  101. chunk=chunk
  102. ), pub_from)
  103. def publish_message_replace(self, text: str, pub_from: PublishFrom) -> None:
  104. """
  105. Publish message replace
  106. :param text: text
  107. :param pub_from: publish from
  108. :return:
  109. """
  110. self.publish(QueueMessageReplaceEvent(
  111. text=text
  112. ), pub_from)
  113. def publish_retriever_resources(self, retriever_resources: list[dict], pub_from: PublishFrom) -> None:
  114. """
  115. Publish retriever resources
  116. :return:
  117. """
  118. self.publish(QueueRetrieverResourcesEvent(retriever_resources=retriever_resources), pub_from)
  119. def publish_annotation_reply(self, message_annotation_id: str, pub_from: PublishFrom) -> None:
  120. """
  121. Publish annotation reply
  122. :param message_annotation_id: message annotation id
  123. :param pub_from: publish from
  124. :return:
  125. """
  126. self.publish(AnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
  127. def publish_message_end(self, llm_result: LLMResult, pub_from: PublishFrom) -> None:
  128. """
  129. Publish message end
  130. :param llm_result: llm result
  131. :param pub_from: publish from
  132. :return:
  133. """
  134. self.publish(QueueMessageEndEvent(llm_result=llm_result), pub_from)
  135. self.stop_listen()
  136. def publish_agent_thought(self, message_agent_thought: MessageAgentThought, pub_from: PublishFrom) -> None:
  137. """
  138. Publish agent thought
  139. :param message_agent_thought: message agent thought
  140. :param pub_from: publish from
  141. :return:
  142. """
  143. self.publish(QueueAgentThoughtEvent(
  144. agent_thought_id=message_agent_thought.id
  145. ), pub_from)
  146. def publish_message_file(self, message_file: MessageFile, pub_from: PublishFrom) -> None:
  147. """
  148. Publish agent thought
  149. :param message_file: message file
  150. :param pub_from: publish from
  151. :return:
  152. """
  153. self.publish(QueueMessageFileEvent(
  154. message_file_id=message_file.id
  155. ), pub_from)
  156. def publish_error(self, e, pub_from: PublishFrom) -> None:
  157. """
  158. Publish error
  159. :param e: error
  160. :param pub_from: publish from
  161. :return:
  162. """
  163. self.publish(QueueErrorEvent(
  164. error=e
  165. ), pub_from)
  166. self.stop_listen()
  167. def publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
  168. """
  169. Publish event to queue
  170. :param event:
  171. :param pub_from:
  172. :return:
  173. """
  174. self._check_for_sqlalchemy_models(event.dict())
  175. message = QueueMessage(
  176. task_id=self._task_id,
  177. message_id=self._message_id,
  178. conversation_id=self._conversation_id,
  179. app_mode=self._app_mode,
  180. event=event
  181. )
  182. self._q.put(message)
  183. if isinstance(event, QueueStopEvent):
  184. self.stop_listen()
  185. if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
  186. raise ConversationTaskStoppedException()
  187. @classmethod
  188. def set_stop_flag(cls, task_id: str, invoke_from: InvokeFrom, user_id: str) -> None:
  189. """
  190. Set task stop flag
  191. :return:
  192. """
  193. result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
  194. if result is None:
  195. return
  196. user_prefix = 'account' if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
  197. if result.decode('utf-8') != f"{user_prefix}-{user_id}":
  198. return
  199. stopped_cache_key = cls._generate_stopped_cache_key(task_id)
  200. redis_client.setex(stopped_cache_key, 600, 1)
  201. def _is_stopped(self) -> bool:
  202. """
  203. Check if task is stopped
  204. :return:
  205. """
  206. stopped_cache_key = ApplicationQueueManager._generate_stopped_cache_key(self._task_id)
  207. result = redis_client.get(stopped_cache_key)
  208. if result is not None:
  209. return True
  210. return False
  211. @classmethod
  212. def _generate_task_belong_cache_key(cls, task_id: str) -> str:
  213. """
  214. Generate task belong cache key
  215. :param task_id: task id
  216. :return:
  217. """
  218. return f"generate_task_belong:{task_id}"
  219. @classmethod
  220. def _generate_stopped_cache_key(cls, task_id: str) -> str:
  221. """
  222. Generate stopped cache key
  223. :param task_id: task id
  224. :return:
  225. """
  226. return f"generate_task_stopped:{task_id}"
  227. def _check_for_sqlalchemy_models(self, data: Any):
  228. # from entity to dict or list
  229. if isinstance(data, dict):
  230. for key, value in data.items():
  231. self._check_for_sqlalchemy_models(value)
  232. elif isinstance(data, list):
  233. for item in data:
  234. self._check_for_sqlalchemy_models(item)
  235. else:
  236. if isinstance(data, DeclarativeMeta) or hasattr(data, '_sa_instance_state'):
  237. raise TypeError("Critical Error: Passing SQLAlchemy Model instances "
  238. "that cause thread safety issues is not allowed.")
  239. class ConversationTaskStoppedException(Exception):
  240. pass