serpapi_provider.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from typing import Optional
  2. from core.tool.provider.base import BaseToolProvider
  3. from core.tool.provider.errors import ToolValidateFailedError
  4. from core.tool.serpapi_wrapper import OptimizedSerpAPIWrapper
  5. from models.tool import ToolProviderName
  6. class SerpAPIToolProvider(BaseToolProvider):
  7. def get_provider_name(self) -> ToolProviderName:
  8. """
  9. Returns the name of the provider.
  10. :return:
  11. """
  12. return ToolProviderName.SERPAPI
  13. def get_credentials(self, obfuscated: bool = False) -> Optional[dict]:
  14. """
  15. Returns the credentials for SerpAPI as a dictionary.
  16. :param obfuscated: obfuscate credentials if True
  17. :return:
  18. """
  19. tool_provider = self.get_provider(must_enabled=True)
  20. if not tool_provider:
  21. return None
  22. credentials = tool_provider.credentials
  23. if not credentials:
  24. return None
  25. if credentials.get('api_key'):
  26. credentials['api_key'] = self.decrypt_token(credentials.get('api_key'), obfuscated)
  27. return credentials
  28. def credentials_to_func_kwargs(self) -> Optional[dict]:
  29. """
  30. Returns the credentials function kwargs as a dictionary.
  31. :return:
  32. """
  33. credentials = self.get_credentials()
  34. if not credentials:
  35. return None
  36. return {
  37. 'serpapi_api_key': credentials.get('api_key')
  38. }
  39. def credentials_validate(self, credentials: dict):
  40. """
  41. Validates the given credentials.
  42. :param credentials:
  43. :return:
  44. """
  45. if 'api_key' not in credentials or not credentials.get('api_key'):
  46. raise ToolValidateFailedError("SerpAPI api_key is required.")
  47. api_key = credentials.get('api_key')
  48. try:
  49. OptimizedSerpAPIWrapper(serpapi_api_key=api_key).run(query='test')
  50. except Exception as e:
  51. raise ToolValidateFailedError("SerpAPI api_key is invalid. {}".format(e))
  52. def encrypt_credentials(self, credentials: dict) -> Optional[dict]:
  53. """
  54. Encrypts the given credentials.
  55. :param credentials:
  56. :return:
  57. """
  58. credentials['api_key'] = self.encrypt_token(credentials.get('api_key'))
  59. return credentials