remote_retrieval.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import logging
  2. from typing import Optional
  3. import requests
  4. from configs import dify_config
  5. from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval
  6. from services.recommend_app.recommend_app_base import RecommendAppRetrievalBase
  7. from services.recommend_app.recommend_app_type import RecommendAppType
  8. logger = logging.getLogger(__name__)
  9. class RemoteRecommendAppRetrieval(RecommendAppRetrievalBase):
  10. """
  11. Retrieval recommended app from dify official
  12. """
  13. def get_recommend_app_detail(self, app_id: str):
  14. try:
  15. result = self.fetch_recommended_app_detail_from_dify_official(app_id)
  16. except Exception as e:
  17. logger.warning(f"fetch recommended app detail from dify official failed: {e}, switch to built-in.")
  18. result = BuildInRecommendAppRetrieval.fetch_recommended_app_detail_from_builtin(app_id)
  19. return result
  20. def get_recommended_apps_and_categories(self, language: str) -> dict:
  21. try:
  22. result = self.fetch_recommended_apps_from_dify_official(language)
  23. except Exception as e:
  24. logger.warning(f"fetch recommended apps from dify official failed: {e}, switch to built-in.")
  25. result = BuildInRecommendAppRetrieval.fetch_recommended_apps_from_builtin(language)
  26. return result
  27. def get_type(self) -> str:
  28. return RecommendAppType.REMOTE
  29. @classmethod
  30. def fetch_recommended_app_detail_from_dify_official(cls, app_id: str) -> Optional[dict]:
  31. """
  32. Fetch recommended app detail from dify official.
  33. :param app_id: App ID
  34. :return:
  35. """
  36. domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
  37. url = f"{domain}/apps/{app_id}"
  38. response = requests.get(url, timeout=(3, 10))
  39. if response.status_code != 200:
  40. return None
  41. return response.json()
  42. @classmethod
  43. def fetch_recommended_apps_from_dify_official(cls, language: str) -> dict:
  44. """
  45. Fetch recommended apps from dify official.
  46. :param language: language
  47. :return:
  48. """
  49. domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
  50. url = f"{domain}/apps?language={language}"
  51. response = requests.get(url, timeout=(3, 10))
  52. if response.status_code != 200:
  53. raise ValueError(f"fetch recommended apps failed, status code: {response.status_code}")
  54. result = response.json()
  55. if "categories" in result:
  56. result["categories"] = sorted(result["categories"])
  57. return result