|
@@ -4,6 +4,8 @@ import time
|
|
|
from collections.abc import Generator
|
|
|
from typing import Any, Optional, Union, cast
|
|
|
|
|
|
+from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
|
|
|
+from core.app.apps.advanced_chat.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk
|
|
|
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
|
|
from core.app.entities.app_invoke_entities import (
|
|
|
AdvancedChatAppGenerateEntity,
|
|
@@ -33,6 +35,8 @@ from core.app.entities.task_entities import (
|
|
|
ChatbotAppStreamResponse,
|
|
|
ChatflowStreamGenerateRoute,
|
|
|
ErrorStreamResponse,
|
|
|
+ MessageAudioEndStreamResponse,
|
|
|
+ MessageAudioStreamResponse,
|
|
|
MessageEndStreamResponse,
|
|
|
StreamResponse,
|
|
|
)
|
|
@@ -71,13 +75,13 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
_iteration_nested_relations: dict[str, list[str]]
|
|
|
|
|
|
def __init__(
|
|
|
- self, application_generate_entity: AdvancedChatAppGenerateEntity,
|
|
|
- workflow: Workflow,
|
|
|
- queue_manager: AppQueueManager,
|
|
|
- conversation: Conversation,
|
|
|
- message: Message,
|
|
|
- user: Union[Account, EndUser],
|
|
|
- stream: bool
|
|
|
+ self, application_generate_entity: AdvancedChatAppGenerateEntity,
|
|
|
+ workflow: Workflow,
|
|
|
+ queue_manager: AppQueueManager,
|
|
|
+ conversation: Conversation,
|
|
|
+ message: Message,
|
|
|
+ user: Union[Account, EndUser],
|
|
|
+ stream: bool
|
|
|
) -> None:
|
|
|
"""
|
|
|
Initialize AdvancedChatAppGenerateTaskPipeline.
|
|
@@ -129,7 +133,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
self._application_generate_entity.query
|
|
|
)
|
|
|
|
|
|
- generator = self._process_stream_response(
|
|
|
+ generator = self._wrapper_process_stream_response(
|
|
|
trace_manager=self._application_generate_entity.trace_manager
|
|
|
)
|
|
|
if self._stream:
|
|
@@ -138,7 +142,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
return self._to_blocking_response(generator)
|
|
|
|
|
|
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) \
|
|
|
- -> ChatbotAppBlockingResponse:
|
|
|
+ -> ChatbotAppBlockingResponse:
|
|
|
"""
|
|
|
Process blocking response.
|
|
|
:return:
|
|
@@ -169,7 +173,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
raise Exception('Queue listening stopped unexpectedly.')
|
|
|
|
|
|
def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \
|
|
|
- -> Generator[ChatbotAppStreamResponse, None, None]:
|
|
|
+ -> Generator[ChatbotAppStreamResponse, None, None]:
|
|
|
"""
|
|
|
To stream response.
|
|
|
:return:
|
|
@@ -182,14 +186,68 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
stream_response=stream_response
|
|
|
)
|
|
|
|
|
|
+ def _listenAudioMsg(self, publisher, task_id: str):
|
|
|
+ if not publisher:
|
|
|
+ return None
|
|
|
+ audio_msg: AudioTrunk = publisher.checkAndGetAudio()
|
|
|
+ if audio_msg and audio_msg.status != "finish":
|
|
|
+ return MessageAudioStreamResponse(audio=audio_msg.audio, task_id=task_id)
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _wrapper_process_stream_response(self, trace_manager: Optional[TraceQueueManager] = None) -> \
|
|
|
+ Generator[StreamResponse, None, None]:
|
|
|
+
|
|
|
+ publisher = None
|
|
|
+ task_id = self._application_generate_entity.task_id
|
|
|
+ tenant_id = self._application_generate_entity.app_config.tenant_id
|
|
|
+ features_dict = self._workflow.features_dict
|
|
|
+
|
|
|
+ if features_dict.get('text_to_speech') and features_dict['text_to_speech'].get('enabled') and features_dict[
|
|
|
+ 'text_to_speech'].get('autoPlay') == 'enabled':
|
|
|
+ publisher = AppGeneratorTTSPublisher(tenant_id, features_dict['text_to_speech'].get('voice'))
|
|
|
+ for response in self._process_stream_response(publisher=publisher, trace_manager=trace_manager):
|
|
|
+ while True:
|
|
|
+ audio_response = self._listenAudioMsg(publisher, task_id=task_id)
|
|
|
+ if audio_response:
|
|
|
+ yield audio_response
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ yield response
|
|
|
+
|
|
|
+ start_listener_time = time.time()
|
|
|
+ # timeout
|
|
|
+ while (time.time() - start_listener_time) < TTS_AUTO_PLAY_TIMEOUT:
|
|
|
+ try:
|
|
|
+ if not publisher:
|
|
|
+ break
|
|
|
+ audio_trunk = publisher.checkAndGetAudio()
|
|
|
+ if audio_trunk is None:
|
|
|
+ # release cpu
|
|
|
+ # sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file)
|
|
|
+ time.sleep(TTS_AUTO_PLAY_YIELD_CPU_TIME)
|
|
|
+ continue
|
|
|
+ if audio_trunk.status == "finish":
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ start_listener_time = time.time()
|
|
|
+ yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(e)
|
|
|
+ break
|
|
|
+ yield MessageAudioEndStreamResponse(audio='', task_id=task_id)
|
|
|
+
|
|
|
def _process_stream_response(
|
|
|
- self, trace_manager: Optional[TraceQueueManager] = None
|
|
|
+ self,
|
|
|
+ publisher: AppGeneratorTTSPublisher,
|
|
|
+ trace_manager: Optional[TraceQueueManager] = None
|
|
|
) -> Generator[StreamResponse, None, None]:
|
|
|
"""
|
|
|
Process stream response.
|
|
|
:return:
|
|
|
"""
|
|
|
for message in self._queue_manager.listen():
|
|
|
+ if publisher:
|
|
|
+ publisher.publish(message=message)
|
|
|
event = message.event
|
|
|
|
|
|
if isinstance(event, QueueErrorEvent):
|
|
@@ -301,7 +359,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
continue
|
|
|
|
|
|
if not self._is_stream_out_support(
|
|
|
- event=event
|
|
|
+ event=event
|
|
|
):
|
|
|
continue
|
|
|
|
|
@@ -318,7 +376,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
yield self._ping_stream_response()
|
|
|
else:
|
|
|
continue
|
|
|
-
|
|
|
+ if publisher:
|
|
|
+ publisher.publish(None)
|
|
|
if self._conversation_name_generate_thread:
|
|
|
self._conversation_name_generate_thread.join()
|
|
|
|
|
@@ -402,7 +461,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
return stream_generate_routes
|
|
|
|
|
|
def _get_answer_start_at_node_ids(self, graph: dict, target_node_id: str) \
|
|
|
- -> list[str]:
|
|
|
+ -> list[str]:
|
|
|
"""
|
|
|
Get answer start at node id.
|
|
|
:param graph: graph
|
|
@@ -457,7 +516,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
start_node_id = target_node_id
|
|
|
start_node_ids.append(start_node_id)
|
|
|
elif node_type == NodeType.START.value or \
|
|
|
- node_iteration_id is not None and iteration_start_node_id == source_node.get('id'):
|
|
|
+ node_iteration_id is not None and iteration_start_node_id == source_node.get('id'):
|
|
|
start_node_id = source_node_id
|
|
|
start_node_ids.append(start_node_id)
|
|
|
else:
|
|
@@ -515,7 +574,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
# all route chunks are generated
|
|
|
if self._task_state.current_stream_generate_state.current_route_position == len(
|
|
|
- self._task_state.current_stream_generate_state.generate_route
|
|
|
+ self._task_state.current_stream_generate_state.generate_route
|
|
|
):
|
|
|
self._task_state.current_stream_generate_state = None
|
|
|
|
|
@@ -525,7 +584,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
:return:
|
|
|
"""
|
|
|
if not self._task_state.current_stream_generate_state:
|
|
|
- return None
|
|
|
+ return
|
|
|
|
|
|
route_chunks = self._task_state.current_stream_generate_state.generate_route[
|
|
|
self._task_state.current_stream_generate_state.current_route_position:]
|
|
@@ -573,7 +632,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
# get route chunk node execution info
|
|
|
route_chunk_node_execution_info = self._task_state.ran_node_execution_infos[route_chunk_node_id]
|
|
|
if (route_chunk_node_execution_info.node_type == NodeType.LLM
|
|
|
- and latest_node_execution_info.node_type == NodeType.LLM):
|
|
|
+ and latest_node_execution_info.node_type == NodeType.LLM):
|
|
|
# only LLM support chunk stream output
|
|
|
self._task_state.current_stream_generate_state.current_route_position += 1
|
|
|
continue
|
|
@@ -643,7 +702,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
|
|
|
|
|
|
# all route chunks are generated
|
|
|
if self._task_state.current_stream_generate_state.current_route_position == len(
|
|
|
- self._task_state.current_stream_generate_state.generate_route
|
|
|
+ self._task_state.current_stream_generate_state.generate_route
|
|
|
):
|
|
|
self._task_state.current_stream_generate_state = None
|
|
|
|