1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import logging
- from typing import Optional
- import requests
- from configs import dify_config
- from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval
- from services.recommend_app.recommend_app_base import RecommendAppRetrievalBase
- from services.recommend_app.recommend_app_type import RecommendAppType
- logger = logging.getLogger(__name__)
- class RemoteRecommendAppRetrieval(RecommendAppRetrievalBase):
- """
- Retrieval recommended app from dify official
- """
- def get_recommend_app_detail(self, app_id: str):
- try:
- result = self.fetch_recommended_app_detail_from_dify_official(app_id)
- except Exception as e:
- logger.warning(f"fetch recommended app detail from dify official failed: {e}, switch to built-in.")
- result = BuildInRecommendAppRetrieval.fetch_recommended_app_detail_from_builtin(app_id)
- return result
- def get_recommended_apps_and_categories(self, language: str) -> dict:
- try:
- result = self.fetch_recommended_apps_from_dify_official(language)
- except Exception as e:
- logger.warning(f"fetch recommended apps from dify official failed: {e}, switch to built-in.")
- result = BuildInRecommendAppRetrieval.fetch_recommended_apps_from_builtin(language)
- return result
- def get_type(self) -> str:
- return RecommendAppType.REMOTE
- @classmethod
- def fetch_recommended_app_detail_from_dify_official(cls, app_id: str) -> Optional[dict]:
- """
- Fetch recommended app detail from dify official.
- :param app_id: App ID
- :return:
- """
- domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
- url = f"{domain}/apps/{app_id}"
- response = requests.get(url, timeout=(3, 10))
- if response.status_code != 200:
- return None
- return response.json()
- @classmethod
- def fetch_recommended_apps_from_dify_official(cls, language: str) -> dict:
- """
- Fetch recommended apps from dify official.
- :param language: language
- :return:
- """
- domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN
- url = f"{domain}/apps?language={language}"
- response = requests.get(url, timeout=(3, 10))
- if response.status_code != 200:
- raise ValueError(f"fetch recommended apps failed, status code: {response.status_code}")
- result = response.json()
- if "categories" in result:
- result["categories"] = sorted(result["categories"])
- return result
|