|
- import re
- import uuid
- from json import dumps as json_dumps
- from json import loads as json_loads
- from json.decoder import JSONDecodeError
- from requests import get
- from yaml import YAMLError, safe_load
- from core.tools.entities.common_entities import I18nObject
- from core.tools.entities.tool_bundle import ApiToolBundle
- from core.tools.entities.tool_entities import ApiProviderSchemaType, ToolParameter
- from core.tools.errors import ToolApiSchemaError, ToolNotSupportedError, ToolProviderNotFoundError
- class ApiBasedToolSchemaParser:
- @staticmethod
- def parse_openapi_to_tool_bundle(openapi: dict, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
- warning = warning if warning is not None else {}
- extra_info = extra_info if extra_info is not None else {}
- # set description to extra_info
- extra_info['description'] = openapi['info'].get('description', '')
- if len(openapi['servers']) == 0:
- raise ToolProviderNotFoundError('No server found in the openapi yaml.')
- server_url = openapi['servers'][0]['url']
- # list all interfaces
- interfaces = []
- for path, path_item in openapi['paths'].items():
- methods = ['get', 'post', 'put', 'delete', 'patch', 'head', 'options', 'trace']
- for method in methods:
- if method in path_item:
- interfaces.append({
- 'path': path,
- 'method': method,
- 'operation': path_item[method],
- })
- # get all parameters
- bundles = []
- for interface in interfaces:
- # convert parameters
- parameters = []
- if 'parameters' in interface['operation']:
- for parameter in interface['operation']['parameters']:
- tool_parameter = ToolParameter(
- name=parameter['name'],
- label=I18nObject(
- en_US=parameter['name'],
- zh_Hans=parameter['name']
- ),
- human_description=I18nObject(
- en_US=parameter.get('description', ''),
- zh_Hans=parameter.get('description', '')
- ),
- type=ToolParameter.ToolParameterType.STRING,
- required=parameter.get('required', False),
- form=ToolParameter.ToolParameterForm.LLM,
- llm_description=parameter.get('description'),
- default=parameter['schema']['default'] if 'schema' in parameter and 'default' in parameter['schema'] else None,
- )
-
- # check if there is a type
- typ = ApiBasedToolSchemaParser._get_tool_parameter_type(parameter)
- if typ:
- tool_parameter.type = typ
- parameters.append(tool_parameter)
- # create tool bundle
- # check if there is a request body
- if 'requestBody' in interface['operation']:
- request_body = interface['operation']['requestBody']
- if 'content' in request_body:
- for content_type, content in request_body['content'].items():
- # if there is a reference, get the reference and overwrite the content
- if 'schema' not in content:
- continue
- if '$ref' in content['schema']:
- # get the reference
- root = openapi
- reference = content['schema']['$ref'].split('/')[1:]
- for ref in reference:
- root = root[ref]
- # overwrite the content
- interface['operation']['requestBody']['content'][content_type]['schema'] = root
- # parse body parameters
- if 'schema' in interface['operation']['requestBody']['content'][content_type]:
- body_schema = interface['operation']['requestBody']['content'][content_type]['schema']
- required = body_schema.get('required', [])
- properties = body_schema.get('properties', {})
- for name, property in properties.items():
- tool = ToolParameter(
- name=name,
- label=I18nObject(
- en_US=name,
- zh_Hans=name
- ),
- human_description=I18nObject(
- en_US=property.get('description', ''),
- zh_Hans=property.get('description', '')
- ),
- type=ToolParameter.ToolParameterType.STRING,
- required=name in required,
- form=ToolParameter.ToolParameterForm.LLM,
- llm_description=property.get('description', ''),
- default=property.get('default', None),
- )
- # check if there is a type
- typ = ApiBasedToolSchemaParser._get_tool_parameter_type(property)
- if typ:
- tool.type = typ
- parameters.append(tool)
- # check if parameters is duplicated
- parameters_count = {}
- for parameter in parameters:
- if parameter.name not in parameters_count:
- parameters_count[parameter.name] = 0
- parameters_count[parameter.name] += 1
- for name, count in parameters_count.items():
- if count > 1:
- warning['duplicated_parameter'] = f'Parameter {name} is duplicated.'
- # check if there is a operation id, use $path_$method as operation id if not
- if 'operationId' not in interface['operation']:
- # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
- path = interface['path']
- if interface['path'].startswith('/'):
- path = interface['path'][1:]
- # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
- path = re.sub(r'[^a-zA-Z0-9_-]', '', path)
- if not path:
- path = str(uuid.uuid4())
-
- interface['operation']['operationId'] = f'{path}_{interface["method"]}'
- bundles.append(ApiToolBundle(
- server_url=server_url + interface['path'],
- method=interface['method'],
- summary=interface['operation']['description'] if 'description' in interface['operation'] else
- interface['operation'].get('summary', None),
- operation_id=interface['operation']['operationId'],
- parameters=parameters,
- author='',
- icon=None,
- openapi=interface['operation'],
- ))
- return bundles
-
- @staticmethod
- def _get_tool_parameter_type(parameter: dict) -> ToolParameter.ToolParameterType:
- parameter = parameter or {}
- typ = None
- if 'type' in parameter:
- typ = parameter['type']
- elif 'schema' in parameter and 'type' in parameter['schema']:
- typ = parameter['schema']['type']
-
- if typ == 'integer' or typ == 'number':
- return ToolParameter.ToolParameterType.NUMBER
- elif typ == 'boolean':
- return ToolParameter.ToolParameterType.BOOLEAN
- elif typ == 'string':
- return ToolParameter.ToolParameterType.STRING
- @staticmethod
- def parse_openapi_yaml_to_tool_bundle(yaml: str, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
- """
- parse openapi yaml to tool bundle
- :param yaml: the yaml string
- :return: the tool bundle
- """
- warning = warning if warning is not None else {}
- extra_info = extra_info if extra_info is not None else {}
- openapi: dict = safe_load(yaml)
- if openapi is None:
- raise ToolApiSchemaError('Invalid openapi yaml.')
- return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(openapi, extra_info=extra_info, warning=warning)
-
- @staticmethod
- def parse_swagger_to_openapi(swagger: dict, extra_info: dict = None, warning: dict = None) -> dict:
- """
- parse swagger to openapi
- :param swagger: the swagger dict
- :return: the openapi dict
- """
- # convert swagger to openapi
- info = swagger.get('info', {
- 'title': 'Swagger',
- 'description': 'Swagger',
- 'version': '1.0.0'
- })
- servers = swagger.get('servers', [])
- if len(servers) == 0:
- raise ToolApiSchemaError('No server found in the swagger yaml.')
- openapi = {
- 'openapi': '3.0.0',
- 'info': {
- 'title': info.get('title', 'Swagger'),
- 'description': info.get('description', 'Swagger'),
- 'version': info.get('version', '1.0.0')
- },
- 'servers': swagger['servers'],
- 'paths': {},
- 'components': {
- 'schemas': {}
- }
- }
- # check paths
- if 'paths' not in swagger or len(swagger['paths']) == 0:
- raise ToolApiSchemaError('No paths found in the swagger yaml.')
- # convert paths
- for path, path_item in swagger['paths'].items():
- openapi['paths'][path] = {}
- for method, operation in path_item.items():
- if 'operationId' not in operation:
- raise ToolApiSchemaError(f'No operationId found in operation {method} {path}.')
-
- if ('summary' not in operation or len(operation['summary']) == 0) and \
- ('description' not in operation or len(operation['description']) == 0):
- warning['missing_summary'] = f'No summary or description found in operation {method} {path}.'
-
- openapi['paths'][path][method] = {
- 'operationId': operation['operationId'],
- 'summary': operation.get('summary', ''),
- 'description': operation.get('description', ''),
- 'parameters': operation.get('parameters', []),
- 'responses': operation.get('responses', {}),
- }
- if 'requestBody' in operation:
- openapi['paths'][path][method]['requestBody'] = operation['requestBody']
- # convert definitions
- for name, definition in swagger['definitions'].items():
- openapi['components']['schemas'][name] = definition
- return openapi
- @staticmethod
- def parse_openai_plugin_json_to_tool_bundle(json: str, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
- """
- parse openapi plugin yaml to tool bundle
- :param json: the json string
- :return: the tool bundle
- """
- warning = warning if warning is not None else {}
- extra_info = extra_info if extra_info is not None else {}
- try:
- openai_plugin = json_loads(json)
- api = openai_plugin['api']
- api_url = api['url']
- api_type = api['type']
- except:
- raise ToolProviderNotFoundError('Invalid openai plugin json.')
-
- if api_type != 'openapi':
- raise ToolNotSupportedError('Only openapi is supported now.')
-
- # get openapi yaml
- response = get(api_url, headers={
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
- }, timeout=5)
- if response.status_code != 200:
- raise ToolProviderNotFoundError('cannot get openapi yaml from url.')
-
- return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle(response.text, extra_info=extra_info, warning=warning)
-
- @staticmethod
- def auto_parse_to_tool_bundle(content: str, extra_info: dict = None, warning: dict = None) -> tuple[list[ApiToolBundle], str]:
- """
- auto parse to tool bundle
- :param content: the content
- :return: tools bundle, schema_type
- """
- warning = warning if warning is not None else {}
- extra_info = extra_info if extra_info is not None else {}
- content = content.strip()
- loaded_content = None
- json_error = None
- yaml_error = None
-
- try:
- loaded_content = json_loads(content)
- except JSONDecodeError as e:
- json_error = e
- if loaded_content is None:
- try:
- loaded_content = safe_load(content)
- except YAMLError as e:
- yaml_error = e
- if loaded_content is None:
- raise ToolApiSchemaError(f'Invalid api schema, schema is neither json nor yaml. json error: {str(json_error)}, yaml error: {str(yaml_error)}')
- swagger_error = None
- openapi_error = None
- openapi_plugin_error = None
- schema_type = None
-
- try:
- openapi = ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(loaded_content, extra_info=extra_info, warning=warning)
- schema_type = ApiProviderSchemaType.OPENAPI.value
- return openapi, schema_type
- except ToolApiSchemaError as e:
- openapi_error = e
-
- # openai parse error, fallback to swagger
- try:
- converted_swagger = ApiBasedToolSchemaParser.parse_swagger_to_openapi(loaded_content, extra_info=extra_info, warning=warning)
- schema_type = ApiProviderSchemaType.SWAGGER.value
- return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(converted_swagger, extra_info=extra_info, warning=warning), schema_type
- except ToolApiSchemaError as e:
- swagger_error = e
-
- # swagger parse error, fallback to openai plugin
- try:
- openapi_plugin = ApiBasedToolSchemaParser.parse_openai_plugin_json_to_tool_bundle(json_dumps(loaded_content), extra_info=extra_info, warning=warning)
- return openapi_plugin, ApiProviderSchemaType.OPENAI_PLUGIN.value
- except ToolNotSupportedError as e:
- # maybe it's not plugin at all
- openapi_plugin_error = e
- raise ToolApiSchemaError(f'Invalid api schema, openapi error: {str(openapi_error)}, swagger error: {str(swagger_error)}, openapi plugin error: {str(openapi_plugin_error)}')
|