Ver Fonte

feat: service api add llm usage (#2051)

takatost há 1 ano atrás
pai
commit
1a6ad05a23

+ 2 - 23
api/controllers/console/app/completion.py

@@ -163,29 +163,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except services.errors.conversation.ConversationNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Conversation Not Exists.")).get_json()) + "\n\n"
-            except services.errors.conversation.ConversationCompletedError:
-                yield "data: " + json.dumps(api.handle_error(ConversationCompletedError()).get_json()) + "\n\n"
-            except services.errors.app_model_config.AppModelConfigBrokenError:
-                logging.exception("App model config broken.")
-                yield "data: " + json.dumps(api.handle_error(AppUnavailableError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 2 - 21
api/controllers/console/app/message.py

@@ -241,27 +241,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except MessageNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Message Not Exists.")).get_json()) + "\n\n"
-            except MoreLikeThisDisabledError:
-                yield "data: " + json.dumps(api.handle_error(AppMoreLikeThisDisabledError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(
-                    api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 2 - 23
api/controllers/console/explore/completion.py

@@ -158,29 +158,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except services.errors.conversation.ConversationNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Conversation Not Exists.")).get_json()) + "\n\n"
-            except services.errors.conversation.ConversationCompletedError:
-                yield "data: " + json.dumps(api.handle_error(ConversationCompletedError()).get_json()) + "\n\n"
-            except services.errors.app_model_config.AppModelConfigBrokenError:
-                logging.exception("App model config broken.")
-                yield "data: " + json.dumps(api.handle_error(AppUnavailableError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 2 - 20
api/controllers/console/explore/message.py

@@ -117,26 +117,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except MessageNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Message Not Exists.")).get_json()) + "\n\n"
-            except MoreLikeThisDisabledError:
-                yield "data: " + json.dumps(api.handle_error(AppMoreLikeThisDisabledError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 2 - 23
api/controllers/console/universal_chat/chat.py

@@ -109,29 +109,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except services.errors.conversation.ConversationNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Conversation Not Exists.")).get_json()) + "\n\n"
-            except services.errors.conversation.ConversationCompletedError:
-                yield "data: " + json.dumps(api.handle_error(ConversationCompletedError()).get_json()) + "\n\n"
-            except services.errors.app_model_config.AppModelConfigBrokenError:
-                logging.exception("App model config broken.")
-                yield "data: " + json.dumps(api.handle_error(AppUnavailableError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError()).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 1 - 0
api/controllers/service_api/__init__.py

@@ -6,5 +6,6 @@ bp = Blueprint('service_api', __name__, url_prefix='/v1')
 api = ExternalApi(bp)
 
 
+from . import index
 from .app import app, audio, completion, conversation, file, message
 from .dataset import dataset, document, segment

+ 8 - 24
api/controllers/service_api/app/completion.py

@@ -79,7 +79,12 @@ class CompletionStopApi(AppApiResource):
         if app_model.mode != 'completion':
             raise AppUnavailableError()
 
-        end_user_id = request.get_json().get('user')
+        parser = reqparse.RequestParser()
+        parser.add_argument('user', required=True, nullable=False, type=str, location='json')
+
+        args = parser.parse_args()
+
+        end_user_id = args.get('user')
 
         ApplicationQueueManager.set_stop_flag(task_id, InvokeFrom.SERVICE_API, end_user_id)
 
@@ -157,29 +162,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except services.errors.conversation.ConversationNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Conversation Not Exists.")).get_json()) + "\n\n"
-            except services.errors.conversation.ConversationCompletedError:
-                yield "data: " + json.dumps(api.handle_error(ConversationCompletedError()).get_json()) + "\n\n"
-            except services.errors.app_model_config.AppModelConfigBrokenError:
-                logging.exception("App model config broken.")
-                yield "data: " + json.dumps(api.handle_error(AppUnavailableError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 0 - 1
api/controllers/service_api/app/conversation.py

@@ -86,5 +86,4 @@ class ConversationRenameApi(AppApiResource):
 
 api.add_resource(ConversationRenameApi, '/conversations/<uuid:c_id>/name', endpoint='conversation_name')
 api.add_resource(ConversationApi, '/conversations')
-api.add_resource(ConversationApi, '/conversations/<uuid:c_id>', endpoint='conversation')
 api.add_resource(ConversationDetailApi, '/conversations/<uuid:c_id>', endpoint='conversation_detail')

+ 16 - 0
api/controllers/service_api/index.py

@@ -0,0 +1,16 @@
+from flask import current_app
+from flask_restful import Resource
+
+from controllers.service_api import api
+
+
+class IndexApi(Resource):
+    def get(self):
+        return {
+            "welcome": "Dify OpenAPI",
+            "api_version": "v1",
+            "server_version": current_app.config['CURRENT_VERSION']
+        }
+
+
+api.add_resource(IndexApi, '/')

+ 2 - 23
api/controllers/web/completion.py

@@ -146,29 +146,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except services.errors.conversation.ConversationNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Conversation Not Exists.")).get_json()) + "\n\n"
-            except services.errors.conversation.ConversationCompletedError:
-                yield "data: " + json.dumps(api.handle_error(ConversationCompletedError()).get_json()) + "\n\n"
-            except services.errors.app_model_config.AppModelConfigBrokenError:
-                logging.exception("App model config broken.")
-                yield "data: " + json.dumps(api.handle_error(AppUnavailableError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 2 - 20
api/controllers/web/message.py

@@ -151,26 +151,8 @@ def compact_response(response: Union[dict, Generator]) -> Response:
         return Response(response=json.dumps(response), status=200, mimetype='application/json')
     else:
         def generate() -> Generator:
-            try:
-                for chunk in response:
-                    yield chunk
-            except MessageNotExistsError:
-                yield "data: " + json.dumps(api.handle_error(NotFound("Message Not Exists.")).get_json()) + "\n\n"
-            except MoreLikeThisDisabledError:
-                yield "data: " + json.dumps(api.handle_error(AppMoreLikeThisDisabledError()).get_json()) + "\n\n"
-            except ProviderTokenNotInitError as ex:
-                yield "data: " + json.dumps(api.handle_error(ProviderNotInitializeError(ex.description)).get_json()) + "\n\n"
-            except QuotaExceededError:
-                yield "data: " + json.dumps(api.handle_error(ProviderQuotaExceededError()).get_json()) + "\n\n"
-            except ModelCurrentlyNotSupportError:
-                yield "data: " + json.dumps(api.handle_error(ProviderModelCurrentlyNotSupportError()).get_json()) + "\n\n"
-            except InvokeError as e:
-                yield "data: " + json.dumps(api.handle_error(CompletionRequestError(e.description)).get_json()) + "\n\n"
-            except ValueError as e:
-                yield "data: " + json.dumps(api.handle_error(e).get_json()) + "\n\n"
-            except Exception:
-                logging.exception("internal server error.")
-                yield "data: " + json.dumps(api.handle_error(InternalServerError()).get_json()) + "\n\n"
+            for chunk in response:
+                yield chunk
 
         return Response(stream_with_context(generate()), status=200,
                         mimetype='text/event-stream')

+ 95 - 5
api/core/app_runner/generate_task_pipeline.py

@@ -5,16 +5,18 @@ from typing import Generator, Optional, Union, cast
 
 from core.app_runner.moderation_handler import ModerationRule, OutputModerationHandler
 from core.application_queue_manager import ApplicationQueueManager, PublishFrom
-from core.entities.application_entities import ApplicationGenerateEntity
+from core.entities.application_entities import ApplicationGenerateEntity, InvokeFrom
 from core.entities.queue_entities import (AnnotationReplyEvent, QueueAgentThoughtEvent, QueueErrorEvent,
                                           QueueMessageEndEvent, QueueMessageEvent, QueueMessageReplaceEvent,
                                           QueuePingEvent, QueueRetrieverResourcesEvent, QueueStopEvent)
+from core.errors.error import ProviderTokenNotInitError, QuotaExceededError, ModelCurrentlyNotSupportError
 from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
 from core.model_runtime.entities.message_entities import (AssistantPromptMessage, ImagePromptMessageContent,
                                                           PromptMessage, PromptMessageContentType, PromptMessageRole,
                                                           TextPromptMessageContent)
 from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
 from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
+from core.model_runtime.utils.encoders import jsonable_encoder
 from core.prompt.prompt_template import PromptTemplateParser
 from events.message_event import message_was_created
 from extensions.ext_database import db
@@ -135,6 +137,8 @@ class GenerateTaskPipeline:
                         completion_tokens
                     )
 
+                self._task_state.metadata['usage'] = jsonable_encoder(self._task_state.llm_result.usage)
+
                 # response moderation
                 if self._output_moderation_handler:
                     self._output_moderation_handler.stop_thread()
@@ -145,12 +149,13 @@ class GenerateTaskPipeline:
                     )
 
                 # Save message
-                self._save_message(event.llm_result)
+                self._save_message(self._task_state.llm_result)
 
                 response = {
                     'event': 'message',
                     'task_id': self._application_generate_entity.task_id,
                     'id': self._message.id,
+                    'message_id': self._message.id,
                     'mode': self._conversation.mode,
                     'answer': event.llm_result.message.content,
                     'metadata': {},
@@ -161,7 +166,7 @@ class GenerateTaskPipeline:
                     response['conversation_id'] = self._conversation.id
 
                 if self._task_state.metadata:
-                    response['metadata'] = self._task_state.metadata
+                    response['metadata'] = self._get_response_metadata()
 
                 return response
             else:
@@ -176,7 +181,9 @@ class GenerateTaskPipeline:
             event = message.event
 
             if isinstance(event, QueueErrorEvent):
-                raise self._handle_error(event)
+                data = self._error_to_stream_response_data(self._handle_error(event))
+                yield self._yield_response(data)
+                break
             elif isinstance(event, (QueueStopEvent, QueueMessageEndEvent)):
                 if isinstance(event, QueueMessageEndEvent):
                     self._task_state.llm_result = event.llm_result
@@ -213,6 +220,8 @@ class GenerateTaskPipeline:
                         completion_tokens
                     )
 
+                self._task_state.metadata['usage'] = jsonable_encoder(self._task_state.llm_result.usage)
+
                 # response moderation
                 if self._output_moderation_handler:
                     self._output_moderation_handler.stop_thread()
@@ -244,13 +253,14 @@ class GenerateTaskPipeline:
                     'event': 'message_end',
                     'task_id': self._application_generate_entity.task_id,
                     'id': self._message.id,
+                    'message_id': self._message.id,
                 }
 
                 if self._conversation.mode == 'chat':
                     response['conversation_id'] = self._conversation.id
 
                 if self._task_state.metadata:
-                    response['metadata'] = self._task_state.metadata
+                    response['metadata'] = self._get_response_metadata()
 
                 yield self._yield_response(response)
             elif isinstance(event, QueueRetrieverResourcesEvent):
@@ -410,6 +420,86 @@ class GenerateTaskPipeline:
         else:
             return Exception(e.description if getattr(e, 'description', None) is not None else str(e))
 
+    def _error_to_stream_response_data(self, e: Exception) -> dict:
+        """
+        Error to stream response.
+        :param e: exception
+        :return:
+        """
+        if isinstance(e, ValueError):
+            data = {
+                'code': 'invalid_param',
+                'message': str(e),
+                'status': 400
+            }
+        elif isinstance(e, ProviderTokenNotInitError):
+            data = {
+                'code': 'provider_not_initialize',
+                'message': e.description,
+                'status': 400
+            }
+        elif isinstance(e, QuotaExceededError):
+            data = {
+                'code': 'provider_quota_exceeded',
+                'message': "Your quota for Dify Hosted Model Provider has been exhausted. "
+                           "Please go to Settings -> Model Provider to complete your own provider credentials.",
+                'status': 400
+            }
+        elif isinstance(e, ModelCurrentlyNotSupportError):
+            data = {
+                'code': 'model_currently_not_support',
+                'message': e.description,
+                'status': 400
+            }
+        elif isinstance(e, InvokeError):
+            data = {
+                'code': 'completion_request_error',
+                'message': e.description,
+                'status': 400
+            }
+        else:
+            logging.error(e)
+            data = {
+                'code': 'internal_server_error',
+                'message': 'Internal Server Error, please contact support.',
+                'status': 500
+            }
+
+        return {
+            'event': 'error',
+            'task_id': self._application_generate_entity.task_id,
+            'message_id': self._message.id,
+            **data
+        }
+
+    def _get_response_metadata(self) -> dict:
+        """
+        Get response metadata by invoke from.
+        :return:
+        """
+        metadata = {}
+
+        # show_retrieve_source
+        if 'retriever_resources' in self._task_state.metadata:
+            if self._application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API]:
+                metadata['retriever_resources'] = self._task_state.metadata['retriever_resources']
+            else:
+                metadata['retriever_resources'] = []
+                for resource in self._task_state.metadata['retriever_resources']:
+                    metadata['retriever_resources'].append({
+                        'segment_id': resource['segment_id'],
+                        'position': resource['position'],
+                        'document_name': resource['document_name'],
+                        'score': resource['score'],
+                        'content': resource['content'],
+                    })
+
+        # show usage
+        if self._application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API]:
+            metadata['usage'] = self._task_state.metadata['usage']
+
+        return metadata
+
     def _yield_response(self, response: dict) -> str:
         """
         Yield response.

+ 2 - 0
api/core/model_runtime/utils/encoders.py

@@ -151,6 +151,8 @@ def jsonable_encoder(
         return str(obj)
     if isinstance(obj, (str, int, float, type(None))):
         return obj
+    if isinstance(obj, Decimal):
+        return format(obj, 'f')
     if isinstance(obj, dict):
         encoded_dict = {}
         allowed_keys = set(obj.keys())

+ 4 - 0
api/libs/external_api.py

@@ -31,6 +31,10 @@ class ExternalApi(Api):
                 'message': getattr(e, 'description', http_status_message(status_code)),
                 'status': status_code
             }
+
+            if default_data['message'] and default_data['message'] == 'Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)':
+                default_data['message'] = 'Invalid JSON payload received or JSON payload is empty.'
+
             headers = e.get_response().headers
         elif isinstance(e, ValueError):
             status_code = 400

+ 12 - 4
api/services/completion_service.py

@@ -27,10 +27,15 @@ class CompletionService:
         auto_generate_name = args['auto_generate_name'] \
             if 'auto_generate_name' in args else True
 
-        if app_model.mode != 'completion' and not query:
-            raise ValueError('query is required')
+        if app_model.mode != 'completion':
+            if not query:
+                raise ValueError('query is required')
 
-        query = query.replace('\x00', '')
+        if query:
+            if not isinstance(query, str):
+                raise ValueError('query must be a string')
+
+            query = query.replace('\x00', '')
 
         conversation_id = args['conversation_id'] if 'conversation_id' in args else None
 
@@ -230,6 +235,10 @@ class CompletionService:
 
             value = user_inputs[variable]
 
+            if value:
+                if not isinstance(value, str):
+                    raise ValueError(f"{variable} in input form must be a string")
+
             if input_type == "select":
                 options = input_config["options"] if "options" in input_config else []
                 if value not in options:
@@ -243,4 +252,3 @@ class CompletionService:
             filtered_inputs[variable] = value.replace('\x00', '') if value else None
 
         return filtered_inputs
-