| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 | from datetime import datetimefrom enum import Enumfrom typing import Any, Optionalfrom pydantic import BaseModel, field_validatorfrom core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunkfrom core.workflow.entities.node_entities import NodeRunMetadataKeyfrom core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeStatefrom core.workflow.nodes import NodeTypefrom core.workflow.nodes.base import BaseNodeDataclass QueueEvent(str, Enum):    """    QueueEvent enum    """    LLM_CHUNK = "llm_chunk"    TEXT_CHUNK = "text_chunk"    AGENT_MESSAGE = "agent_message"    MESSAGE_REPLACE = "message_replace"    MESSAGE_END = "message_end"    ADVANCED_CHAT_MESSAGE_END = "advanced_chat_message_end"    WORKFLOW_STARTED = "workflow_started"    WORKFLOW_SUCCEEDED = "workflow_succeeded"    WORKFLOW_FAILED = "workflow_failed"    ITERATION_START = "iteration_start"    ITERATION_NEXT = "iteration_next"    ITERATION_COMPLETED = "iteration_completed"    NODE_STARTED = "node_started"    NODE_SUCCEEDED = "node_succeeded"    NODE_FAILED = "node_failed"    RETRIEVER_RESOURCES = "retriever_resources"    ANNOTATION_REPLY = "annotation_reply"    AGENT_THOUGHT = "agent_thought"    MESSAGE_FILE = "message_file"    PARALLEL_BRANCH_RUN_STARTED = "parallel_branch_run_started"    PARALLEL_BRANCH_RUN_SUCCEEDED = "parallel_branch_run_succeeded"    PARALLEL_BRANCH_RUN_FAILED = "parallel_branch_run_failed"    ERROR = "error"    PING = "ping"    STOP = "stop"class AppQueueEvent(BaseModel):    """    QueueEvent abstract entity    """    event: QueueEventclass QueueLLMChunkEvent(AppQueueEvent):    """    QueueLLMChunkEvent entity    Only for basic mode apps    """    event: QueueEvent = QueueEvent.LLM_CHUNK    chunk: LLMResultChunkclass QueueIterationStartEvent(AppQueueEvent):    """    QueueIterationStartEvent entity    """    event: QueueEvent = QueueEvent.ITERATION_START    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    start_at: datetime    node_run_index: int    inputs: Optional[dict[str, Any]] = None    predecessor_node_id: Optional[str] = None    metadata: Optional[dict[str, Any]] = Noneclass QueueIterationNextEvent(AppQueueEvent):    """    QueueIterationNextEvent entity    """    event: QueueEvent = QueueEvent.ITERATION_NEXT    index: int    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    node_run_index: int    output: Optional[Any] = None  # output for the current iteration    @field_validator("output", mode="before")    @classmethod    def set_output(cls, v):        """        Set output        """        if v is None:            return None        if isinstance(v, int | float | str | bool | dict | list):            return v        raise ValueError("output must be a valid type")class QueueIterationCompletedEvent(AppQueueEvent):    """    QueueIterationCompletedEvent entity    """    event: QueueEvent = QueueEvent.ITERATION_COMPLETED    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    start_at: datetime    node_run_index: int    inputs: Optional[dict[str, Any]] = None    outputs: Optional[dict[str, Any]] = None    metadata: Optional[dict[str, Any]] = None    steps: int = 0    error: Optional[str] = Noneclass QueueTextChunkEvent(AppQueueEvent):    """    QueueTextChunkEvent entity    """    event: QueueEvent = QueueEvent.TEXT_CHUNK    text: str    from_variable_selector: Optional[list[str]] = None    """from variable selector"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""class QueueAgentMessageEvent(AppQueueEvent):    """    QueueMessageEvent entity    """    event: QueueEvent = QueueEvent.AGENT_MESSAGE    chunk: LLMResultChunkclass QueueMessageReplaceEvent(AppQueueEvent):    """    QueueMessageReplaceEvent entity    """    event: QueueEvent = QueueEvent.MESSAGE_REPLACE    text: strclass QueueRetrieverResourcesEvent(AppQueueEvent):    """    QueueRetrieverResourcesEvent entity    """    event: QueueEvent = QueueEvent.RETRIEVER_RESOURCES    retriever_resources: list[dict]    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""class QueueAnnotationReplyEvent(AppQueueEvent):    """    QueueAnnotationReplyEvent entity    """    event: QueueEvent = QueueEvent.ANNOTATION_REPLY    message_annotation_id: strclass QueueMessageEndEvent(AppQueueEvent):    """    QueueMessageEndEvent entity    """    event: QueueEvent = QueueEvent.MESSAGE_END    llm_result: Optional[LLMResult] = Noneclass QueueAdvancedChatMessageEndEvent(AppQueueEvent):    """    QueueAdvancedChatMessageEndEvent entity    """    event: QueueEvent = QueueEvent.ADVANCED_CHAT_MESSAGE_ENDclass QueueWorkflowStartedEvent(AppQueueEvent):    """    QueueWorkflowStartedEvent entity    """    event: QueueEvent = QueueEvent.WORKFLOW_STARTED    graph_runtime_state: GraphRuntimeStateclass QueueWorkflowSucceededEvent(AppQueueEvent):    """    QueueWorkflowSucceededEvent entity    """    event: QueueEvent = QueueEvent.WORKFLOW_SUCCEEDED    outputs: Optional[dict[str, Any]] = Noneclass QueueWorkflowFailedEvent(AppQueueEvent):    """    QueueWorkflowFailedEvent entity    """    event: QueueEvent = QueueEvent.WORKFLOW_FAILED    error: strclass QueueNodeStartedEvent(AppQueueEvent):    """    QueueNodeStartedEvent entity    """    event: QueueEvent = QueueEvent.NODE_STARTED    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    node_run_index: int = 1    predecessor_node_id: Optional[str] = None    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""    start_at: datetimeclass QueueNodeSucceededEvent(AppQueueEvent):    """    QueueNodeSucceededEvent entity    """    event: QueueEvent = QueueEvent.NODE_SUCCEEDED    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""    start_at: datetime    inputs: Optional[dict[str, Any]] = None    process_data: Optional[dict[str, Any]] = None    outputs: Optional[dict[str, Any]] = None    execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None    error: Optional[str] = Noneclass QueueNodeFailedEvent(AppQueueEvent):    """    QueueNodeFailedEvent entity    """    event: QueueEvent = QueueEvent.NODE_FAILED    node_execution_id: str    node_id: str    node_type: NodeType    node_data: BaseNodeData    parallel_id: Optional[str] = None    """parallel id if node is in parallel"""    parallel_start_node_id: Optional[str] = None    """parallel start node id if node is in parallel"""    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""    start_at: datetime    inputs: Optional[dict[str, Any]] = None    process_data: Optional[dict[str, Any]] = None    outputs: Optional[dict[str, Any]] = None    error: strclass QueueAgentThoughtEvent(AppQueueEvent):    """    QueueAgentThoughtEvent entity    """    event: QueueEvent = QueueEvent.AGENT_THOUGHT    agent_thought_id: strclass QueueMessageFileEvent(AppQueueEvent):    """    QueueAgentThoughtEvent entity    """    event: QueueEvent = QueueEvent.MESSAGE_FILE    message_file_id: strclass QueueErrorEvent(AppQueueEvent):    """    QueueErrorEvent entity    """    event: QueueEvent = QueueEvent.ERROR    error: Any = Noneclass QueuePingEvent(AppQueueEvent):    """    QueuePingEvent entity    """    event: QueueEvent = QueueEvent.PINGclass QueueStopEvent(AppQueueEvent):    """    QueueStopEvent entity    """    class StopBy(Enum):        """        Stop by enum        """        USER_MANUAL = "user-manual"        ANNOTATION_REPLY = "annotation-reply"        OUTPUT_MODERATION = "output-moderation"        INPUT_MODERATION = "input-moderation"    event: QueueEvent = QueueEvent.STOP    stopped_by: StopBy    def get_stop_reason(self) -> str:        """        To stop reason        """        reason_mapping = {            QueueStopEvent.StopBy.USER_MANUAL: "Stopped by user.",            QueueStopEvent.StopBy.ANNOTATION_REPLY: "Stopped by annotation reply.",            QueueStopEvent.StopBy.OUTPUT_MODERATION: "Stopped by output moderation.",            QueueStopEvent.StopBy.INPUT_MODERATION: "Stopped by input moderation.",        }        return reason_mapping.get(self.stopped_by, "Stopped by unknown reason.")class QueueMessage(BaseModel):    """    QueueMessage abstract entity    """    task_id: str    app_mode: str    event: AppQueueEventclass MessageQueueMessage(QueueMessage):    """    MessageQueueMessage entity    """    message_id: str    conversation_id: strclass WorkflowQueueMessage(QueueMessage):    """    WorkflowQueueMessage entity    """    passclass QueueParallelBranchRunStartedEvent(AppQueueEvent):    """    QueueParallelBranchRunStartedEvent entity    """    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_STARTED    parallel_id: str    parallel_start_node_id: str    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""class QueueParallelBranchRunSucceededEvent(AppQueueEvent):    """    QueueParallelBranchRunSucceededEvent entity    """    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED    parallel_id: str    parallel_start_node_id: str    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""class QueueParallelBranchRunFailedEvent(AppQueueEvent):    """    QueueParallelBranchRunFailedEvent entity    """    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_FAILED    parallel_id: str    parallel_start_node_id: str    parent_parallel_id: Optional[str] = None    """parent parallel id if node is in parallel"""    parent_parallel_start_node_id: Optional[str] = None    """parent parallel start node id if node is in parallel"""    in_iteration_id: Optional[str] = None    """iteration id if node is in iteration"""    error: str
 |