| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 | 
							- from abc import abstractmethod
 
- from os import listdir, path
 
- from typing import Any
 
- from core.tools.entities.api_entities import UserToolProviderCredentials
 
- from core.tools.entities.tool_entities import ToolParameter, ToolProviderCredentials, ToolProviderType
 
- from core.tools.entities.values import ToolLabelEnum, default_tool_label_dict
 
- from core.tools.errors import (
 
-     ToolNotFoundError,
 
-     ToolParameterValidationError,
 
-     ToolProviderCredentialValidationError,
 
-     ToolProviderNotFoundError,
 
- )
 
- from core.tools.provider.tool_provider import ToolProviderController
 
- from core.tools.tool.builtin_tool import BuiltinTool
 
- from core.tools.tool.tool import Tool
 
- from core.tools.utils.yaml_utils import load_yaml_file
 
- from core.utils.module_import_helper import load_single_subclass_from_source
 
- class BuiltinToolProviderController(ToolProviderController):
 
-     def __init__(self, **data: Any) -> None:
 
-         if self.provider_type == ToolProviderType.API or self.provider_type == ToolProviderType.APP:
 
-             super().__init__(**data)
 
-             return
 
-         
 
-         # load provider yaml
 
-         provider = self.__class__.__module__.split('.')[-1]
 
-         yaml_path = path.join(path.dirname(path.realpath(__file__)), 'builtin', provider, f'{provider}.yaml')
 
-         try:
 
-             provider_yaml = load_yaml_file(yaml_path)
 
-         except Exception as e:
 
-             raise ToolProviderNotFoundError(f'can not load provider yaml for {provider}: {e}')
 
-         if 'credentials_for_provider' in provider_yaml and provider_yaml['credentials_for_provider'] is not None:
 
-             # set credentials name
 
-             for credential_name in provider_yaml['credentials_for_provider']:
 
-                 provider_yaml['credentials_for_provider'][credential_name]['name'] = credential_name
 
-         super().__init__(**{
 
-             'identity': provider_yaml['identity'],
 
-             'credentials_schema': provider_yaml['credentials_for_provider'] if 'credentials_for_provider' in provider_yaml else None,
 
-         })
 
-     def _get_builtin_tools(self) -> list[Tool]:
 
-         """
 
-             returns a list of tools that the provider can provide
 
-             :return: list of tools
 
-         """
 
-         if self.tools:
 
-             return self.tools
 
-         
 
-         provider = self.identity.name
 
-         tool_path = path.join(path.dirname(path.realpath(__file__)), "builtin", provider, "tools")
 
-         # get all the yaml files in the tool path
 
-         tool_files = list(filter(lambda x: x.endswith(".yaml") and not x.startswith("__"), listdir(tool_path)))
 
-         tools = []
 
-         for tool_file in tool_files:
 
-             # get tool name
 
-             tool_name = tool_file.split(".")[0]
 
-             tool = load_yaml_file(path.join(tool_path, tool_file))
 
-             # get tool class, import the module
 
-             assistant_tool_class = load_single_subclass_from_source(
 
-                 module_name=f'core.tools.provider.builtin.{provider}.tools.{tool_name}',
 
-                 script_path=path.join(path.dirname(path.realpath(__file__)),
 
-                                        'builtin', provider, 'tools', f'{tool_name}.py'),
 
-                 parent_type=BuiltinTool)
 
-             tool["identity"]["provider"] = provider
 
-             tools.append(assistant_tool_class(**tool))
 
-         self.tools = tools
 
-         return tools
 
-     
 
-     def get_credentials_schema(self) -> dict[str, ToolProviderCredentials]:
 
-         """
 
-             returns the credentials schema of the provider
 
-             :return: the credentials schema
 
-         """
 
-         if not self.credentials_schema:
 
-             return {}
 
-         
 
-         return self.credentials_schema.copy()
 
-     
 
-     def user_get_credentials_schema(self) -> UserToolProviderCredentials:
 
-         """
 
-             returns the credentials schema of the provider, this method is used for user
 
-             :return: the credentials schema
 
-         """
 
-         credentials = self.credentials_schema.copy()
 
-         return UserToolProviderCredentials(credentials=credentials)
 
-     def get_tools(self) -> list[Tool]:
 
-         """
 
-             returns a list of tools that the provider can provide
 
-             :return: list of tools
 
-         """
 
-         return self._get_builtin_tools()
 
-     
 
-     def get_tool(self, tool_name: str) -> Tool:
 
-         """
 
-             returns the tool that the provider can provide
 
-         """
 
-         return next(filter(lambda x: x.identity.name == tool_name, self.get_tools()), None)
 
-     def get_parameters(self, tool_name: str) -> list[ToolParameter]:
 
-         """
 
-             returns the parameters of the tool
 
-             :param tool_name: the name of the tool, defined in `get_tools`
 
-             :return: list of parameters
 
-         """
 
-         tool = next(filter(lambda x: x.identity.name == tool_name, self.get_tools()), None)
 
-         if tool is None:
 
-             raise ToolNotFoundError(f'tool {tool_name} not found')
 
-         return tool.parameters
 
-     @property
 
-     def need_credentials(self) -> bool:
 
-         """
 
-             returns whether the provider needs credentials
 
-             :return: whether the provider needs credentials
 
-         """
 
-         return self.credentials_schema is not None and \
 
-             len(self.credentials_schema) != 0
 
-     @property
 
-     def provider_type(self) -> ToolProviderType:
 
-         """
 
-             returns the type of the provider
 
-             :return: type of the provider
 
-         """
 
-         return ToolProviderType.BUILT_IN
 
-     
 
-     @property
 
-     def tool_labels(self) -> list[str]:
 
-         """
 
-             returns the labels of the provider
 
-             :return: labels of the provider
 
-         """
 
-         label_enums = self._get_tool_labels()
 
-         return [default_tool_label_dict[label].name for label in label_enums]
 
-     
 
-     def _get_tool_labels(self) -> list[ToolLabelEnum]:
 
-         """
 
-             returns the labels of the provider
 
-         """
 
-         return self.identity.tags or []
 
-     def validate_parameters(self, tool_id: int, tool_name: str, tool_parameters: dict[str, Any]) -> None:
 
-         """
 
-             validate the parameters of the tool and set the default value if needed
 
-             :param tool_name: the name of the tool, defined in `get_tools`
 
-             :param tool_parameters: the parameters of the tool
 
-         """
 
-         tool_parameters_schema = self.get_parameters(tool_name)
 
-         
 
-         tool_parameters_need_to_validate: dict[str, ToolParameter] = {}
 
-         for parameter in tool_parameters_schema:
 
-             tool_parameters_need_to_validate[parameter.name] = parameter
 
-         for parameter in tool_parameters:
 
-             if parameter not in tool_parameters_need_to_validate:
 
-                 raise ToolParameterValidationError(f'parameter {parameter} not found in tool {tool_name}')
 
-             
 
-             # check type
 
-             parameter_schema = tool_parameters_need_to_validate[parameter]
 
-             if parameter_schema.type == ToolParameter.ToolParameterType.STRING:
 
-                 if not isinstance(tool_parameters[parameter], str):
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be string')
 
-             
 
-             elif parameter_schema.type == ToolParameter.ToolParameterType.NUMBER:
 
-                 if not isinstance(tool_parameters[parameter], int | float):
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be number')
 
-                 
 
-                 if parameter_schema.min is not None and tool_parameters[parameter] < parameter_schema.min:
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be greater than {parameter_schema.min}')
 
-                 
 
-                 if parameter_schema.max is not None and tool_parameters[parameter] > parameter_schema.max:
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be less than {parameter_schema.max}')
 
-                 
 
-             elif parameter_schema.type == ToolParameter.ToolParameterType.BOOLEAN:
 
-                 if not isinstance(tool_parameters[parameter], bool):
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be boolean')
 
-                 
 
-             elif parameter_schema.type == ToolParameter.ToolParameterType.SELECT:
 
-                 if not isinstance(tool_parameters[parameter], str):
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be string')
 
-                 
 
-                 options = parameter_schema.options
 
-                 if not isinstance(options, list):
 
-                     raise ToolParameterValidationError(f'parameter {parameter} options should be list')
 
-                 
 
-                 if tool_parameters[parameter] not in [x.value for x in options]:
 
-                     raise ToolParameterValidationError(f'parameter {parameter} should be one of {options}')
 
-                 
 
-             tool_parameters_need_to_validate.pop(parameter)
 
-         for parameter in tool_parameters_need_to_validate:
 
-             parameter_schema = tool_parameters_need_to_validate[parameter]
 
-             if parameter_schema.required:
 
-                 raise ToolParameterValidationError(f'parameter {parameter} is required')
 
-             
 
-             # the parameter is not set currently, set the default value if needed
 
-             if parameter_schema.default is not None:
 
-                 default_value = parameter_schema.default
 
-                 # parse default value into the correct type
 
-                 if parameter_schema.type == ToolParameter.ToolParameterType.STRING or \
 
-                     parameter_schema.type == ToolParameter.ToolParameterType.SELECT:
 
-                     default_value = str(default_value)
 
-                 elif parameter_schema.type == ToolParameter.ToolParameterType.NUMBER:
 
-                     default_value = float(default_value)
 
-                 elif parameter_schema.type == ToolParameter.ToolParameterType.BOOLEAN:
 
-                     default_value = bool(default_value)
 
-                 tool_parameters[parameter] = default_value
 
-     def validate_credentials_format(self, credentials: dict[str, Any]) -> None:
 
-         """
 
-             validate the format of the credentials of the provider and set the default value if needed
 
-             :param credentials: the credentials of the tool
 
-         """
 
-         credentials_schema = self.credentials_schema
 
-         if credentials_schema is None:
 
-             return
 
-         
 
-         credentials_need_to_validate: dict[str, ToolProviderCredentials] = {}
 
-         for credential_name in credentials_schema:
 
-             credentials_need_to_validate[credential_name] = credentials_schema[credential_name]
 
-         for credential_name in credentials:
 
-             if credential_name not in credentials_need_to_validate:
 
-                 raise ToolProviderCredentialValidationError(f'credential {credential_name} not found in provider {self.identity.name}')
 
-             
 
-             # check type
 
-             credential_schema = credentials_need_to_validate[credential_name]
 
-             if credential_schema == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \
 
-                 credential_schema == ToolProviderCredentials.CredentialsType.TEXT_INPUT:
 
-                 if not isinstance(credentials[credential_name], str):
 
-                     raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be string')
 
-             
 
-             elif credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT:
 
-                 if not isinstance(credentials[credential_name], str):
 
-                     raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be string')
 
-                 
 
-                 options = credential_schema.options
 
-                 if not isinstance(options, list):
 
-                     raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} options should be list')
 
-                 
 
-                 if credentials[credential_name] not in [x.value for x in options]:
 
-                     raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be one of {options}')
 
-             elif credential_schema.type == ToolProviderCredentials.CredentialsType.BOOLEAN:
 
-                 if isinstance(credentials[credential_name], bool):
 
-                     pass
 
-                 elif isinstance(credentials[credential_name], str):
 
-                     if credentials[credential_name].lower() == 'true':
 
-                         credentials[credential_name] = True
 
-                     elif credentials[credential_name].lower() == 'false':
 
-                         credentials[credential_name] = False
 
-                     else:
 
-                         raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean')
 
-                 elif isinstance(credentials[credential_name], int):
 
-                     if credentials[credential_name] == 1:
 
-                         credentials[credential_name] = True
 
-                     elif credentials[credential_name] == 0:
 
-                         credentials[credential_name] = False
 
-                     else:
 
-                         raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean')
 
-                 else:
 
-                     raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean')
 
-             if credentials[credential_name] or credentials[credential_name] == False:
 
-                 credentials_need_to_validate.pop(credential_name)
 
-         for credential_name in credentials_need_to_validate:
 
-             credential_schema = credentials_need_to_validate[credential_name]
 
-             if credential_schema.required:
 
-                 raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} is required')
 
-             
 
-             # the credential is not set currently, set the default value if needed
 
-             if credential_schema.default is not None:
 
-                 default_value = credential_schema.default
 
-                 # parse default value into the correct type
 
-                 if credential_schema.type == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \
 
-                     credential_schema.type == ToolProviderCredentials.CredentialsType.TEXT_INPUT or \
 
-                     credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT:
 
-                     default_value = str(default_value)
 
-                 credentials[credential_name] = default_value
 
-     
 
-     def validate_credentials(self, credentials: dict[str, Any]) -> None:
 
-         """
 
-             validate the credentials of the provider
 
-             :param tool_name: the name of the tool, defined in `get_tools`
 
-             :param credentials: the credentials of the tool
 
-         """
 
-         # validate credentials format
 
-         self.validate_credentials_format(credentials)
 
-         # validate credentials
 
-         self._validate_credentials(credentials)
 
-     @abstractmethod
 
-     def _validate_credentials(self, credentials: dict[str, Any]) -> None:
 
-         """
 
-             validate the credentials of the provider
 
-             :param tool_name: the name of the tool, defined in `get_tools`
 
-             :param credentials: the credentials of the tool
 
-         """
 
-         pass
 
 
  |