agent_runner.py 12 KB


  1. import logging
  2. from typing import cast, Optional, List
  3. from langchain import WikipediaAPIWrapper
  4. from langchain.callbacks.base import BaseCallbackHandler
  5. from langchain.tools import BaseTool, WikipediaQueryRun, Tool
  6. from pydantic import BaseModel, Field
  7. from core.agent.agent.agent_llm_callback import AgentLLMCallback
  8. from core.agent.agent_executor import PlanningStrategy, AgentConfiguration, AgentExecutor
  9. from core.application_queue_manager import ApplicationQueueManager
  10. from core.callback_handler.agent_loop_gather_callback_handler import AgentLoopGatherCallbackHandler
  11. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  12. from core.callback_handler.std_out_callback_handler import DifyStdOutCallbackHandler
  13. from core.entities.application_entities import ModelConfigEntity, InvokeFrom, \
  14. AgentEntity, AgentToolEntity, AppOrchestrationConfigEntity
  15. from core.memory.token_buffer_memory import TokenBufferMemory
  16. from core.model_runtime.entities.model_entities import ModelFeature, ModelType
  17. from core.model_runtime.model_providers import model_provider_factory
  18. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  19. from core.tool.current_datetime_tool import DatetimeTool
  20. from core.tool.dataset_retriever_tool import DatasetRetrieverTool
  21. from core.tool.provider.serpapi_provider import SerpAPIToolProvider
  22. from core.tool.serpapi_wrapper import OptimizedSerpAPIWrapper, OptimizedSerpAPIInput
  23. from core.tool.web_reader_tool import WebReaderTool
  24. from extensions.ext_database import db
  25. from models.dataset import Dataset
  26. from models.model import Message
  27. logger = logging.getLogger(__name__)
  28. class AgentRunnerFeature:
  29. def __init__(self, tenant_id: str,
  30. app_orchestration_config: AppOrchestrationConfigEntity,
  31. model_config: ModelConfigEntity,
  32. config: AgentEntity,
  33. queue_manager: ApplicationQueueManager,
  34. message: Message,
  35. user_id: str,
  36. agent_llm_callback: AgentLLMCallback,
  37. callback: AgentLoopGatherCallbackHandler,
  38. memory: Optional[TokenBufferMemory] = None,) -> None:
  39. """
  40. Agent runner
  41. :param tenant_id: tenant id
  42. :param app_orchestration_config: app orchestration config
  43. :param model_config: model config
  44. :param config: dataset config
  45. :param queue_manager: queue manager
  46. :param message: message
  47. :param user_id: user id
  48. :param agent_llm_callback: agent llm callback
  49. :param callback: callback
  50. :param memory: memory
  51. """
  52. self.tenant_id = tenant_id
  53. self.app_orchestration_config = app_orchestration_config
  54. self.model_config = model_config
  55. self.config = config
  56. self.queue_manager = queue_manager
  57. self.message = message
  58. self.user_id = user_id
  59. self.agent_llm_callback = agent_llm_callback
  60. self.callback = callback
  61. self.memory = memory
  62. def run(self, query: str,
  63. invoke_from: InvokeFrom) -> Optional[str]:
  64. """
  65. Retrieve agent loop result.
  66. :param query: query
  67. :param invoke_from: invoke from
  68. :return:
  69. """
  70. provider = self.config.provider
  71. model = self.config.model
  72. tool_configs = self.config.tools
  73. # check model is support tool calling
  74. provider_instance = model_provider_factory.get_provider_instance(provider=provider)
  75. model_type_instance = provider_instance.get_model_instance(ModelType.LLM)
  76. model_type_instance = cast(LargeLanguageModel, model_type_instance)
  77. # get model schema
  78. model_schema = model_type_instance.get_model_schema(
  79. model=model,
  80. credentials=self.model_config.credentials
  81. )
  82. if not model_schema:
  83. return None
  84. planning_strategy = PlanningStrategy.REACT
  85. features = model_schema.features
  86. if features:
  87. if ModelFeature.TOOL_CALL in features \
  88. or ModelFeature.MULTI_TOOL_CALL in features:
  89. planning_strategy = PlanningStrategy.FUNCTION_CALL
  90. tools = self.to_tools(
  91. tool_configs=tool_configs,
  92. invoke_from=invoke_from,
  93. callbacks=[self.callback, DifyStdOutCallbackHandler()],
  94. )
  95. if len(tools) == 0:
  96. return None
  97. agent_configuration = AgentConfiguration(
  98. strategy=planning_strategy,
  99. model_config=self.model_config,
  100. tools=tools,
  101. memory=self.memory,
  102. max_iterations=10,
  103. max_execution_time=400.0,
  104. early_stopping_method="generate",
  105. agent_llm_callback=self.agent_llm_callback,
  106. callbacks=[self.callback, DifyStdOutCallbackHandler()]
  107. )
  108. agent_executor = AgentExecutor(agent_configuration)
  109. try:
  110. # check if should use agent
  111. should_use_agent = agent_executor.should_use_agent(query)
  112. if not should_use_agent:
  113. return None
  114. result = agent_executor.run(query)
  115. return result.output
  116. except Exception as ex:
  117. logger.exception("agent_executor run failed")
  118. return None
  119. def to_tools(self, tool_configs: list[AgentToolEntity],
  120. invoke_from: InvokeFrom,
  121. callbacks: list[BaseCallbackHandler]) \
  122. -> Optional[List[BaseTool]]:
  123. """
  124. Convert tool configs to tools
  125. :param tool_configs: tool configs
  126. :param invoke_from: invoke from
  127. :param callbacks: callbacks
  128. """
  129. tools = []
  130. for tool_config in tool_configs:
  131. tool = None
  132. if tool_config.tool_id == "dataset":
  133. tool = self.to_dataset_retriever_tool(
  134. tool_config=tool_config.config,
  135. invoke_from=invoke_from
  136. )
  137. elif tool_config.tool_id == "web_reader":
  138. tool = self.to_web_reader_tool(
  139. tool_config=tool_config.config,
  140. invoke_from=invoke_from
  141. )
  142. elif tool_config.tool_id == "google_search":
  143. tool = self.to_google_search_tool(
  144. tool_config=tool_config.config,
  145. invoke_from=invoke_from
  146. )
  147. elif tool_config.tool_id == "wikipedia":
  148. tool = self.to_wikipedia_tool(
  149. tool_config=tool_config.config,
  150. invoke_from=invoke_from
  151. )
  152. elif tool_config.tool_id == "current_datetime":
  153. tool = self.to_current_datetime_tool(
  154. tool_config=tool_config.config,
  155. invoke_from=invoke_from
  156. )
  157. if tool:
  158. if tool.callbacks is not None:
  159. tool.callbacks.extend(callbacks)
  160. else:
  161. tool.callbacks = callbacks
  162. tools.append(tool)
  163. return tools
  164. def to_dataset_retriever_tool(self, tool_config: dict,
  165. invoke_from: InvokeFrom) \
  166. -> Optional[BaseTool]:
  167. """
  168. A dataset tool is a tool that can be used to retrieve information from a dataset
  169. :param tool_config: tool config
  170. :param invoke_from: invoke from
  171. """
  172. show_retrieve_source = self.app_orchestration_config.show_retrieve_source
  173. hit_callback = DatasetIndexToolCallbackHandler(
  174. queue_manager=self.queue_manager,
  175. app_id=self.message.app_id,
  176. message_id=self.message.id,
  177. user_id=self.user_id,
  178. invoke_from=invoke_from
  179. )
  180. # get dataset from dataset id
  181. dataset = db.session.query(Dataset).filter(
  182. Dataset.tenant_id == self.tenant_id,
  183. Dataset.id == tool_config.get("id")
  184. ).first()
  185. # pass if dataset is not available
  186. if not dataset:
  187. return None
  188. # pass if dataset is not available
  189. if (dataset and dataset.available_document_count == 0
  190. and dataset.available_document_count == 0):
  191. return None
  192. # get retrieval model config
  193. default_retrieval_model = {
  194. 'search_method': 'semantic_search',
  195. 'reranking_enable': False,
  196. 'reranking_model': {
  197. 'reranking_provider_name': '',
  198. 'reranking_model_name': ''
  199. },
  200. 'top_k': 2,
  201. 'score_threshold_enabled': False
  202. }
  203. retrieval_model_config = dataset.retrieval_model \
  204. if dataset.retrieval_model else default_retrieval_model
  205. # get top k
  206. top_k = retrieval_model_config['top_k']
  207. # get score threshold
  208. score_threshold = None
  209. score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
  210. if score_threshold_enabled:
  211. score_threshold = retrieval_model_config.get("score_threshold")
  212. tool = DatasetRetrieverTool.from_dataset(
  213. dataset=dataset,
  214. top_k=top_k,
  215. score_threshold=score_threshold,
  216. hit_callbacks=[hit_callback],
  217. return_resource=show_retrieve_source,
  218. retriever_from=invoke_from.to_source()
  219. )
  220. return tool
  221. def to_web_reader_tool(self, tool_config: dict,
  222. invoke_from: InvokeFrom) -> Optional[BaseTool]:
  223. """
  224. A tool for reading web pages
  225. :param tool_config: tool config
  226. :param invoke_from: invoke from
  227. :return:
  228. """
  229. model_parameters = {
  230. "temperature": 0,
  231. "max_tokens": 500
  232. }
  233. tool = WebReaderTool(
  234. model_config=self.model_config,
  235. model_parameters=model_parameters,
  236. max_chunk_length=4000,
  237. continue_reading=True
  238. )
  239. return tool
  240. def to_google_search_tool(self, tool_config: dict,
  241. invoke_from: InvokeFrom) -> Optional[BaseTool]:
  242. """
  243. A tool for performing a Google search and extracting snippets and webpages
  244. :param tool_config: tool config
  245. :param invoke_from: invoke from
  246. :return:
  247. """
  248. tool_provider = SerpAPIToolProvider(tenant_id=self.tenant_id)
  249. func_kwargs = tool_provider.credentials_to_func_kwargs()
  250. if not func_kwargs:
  251. return None
  252. tool = Tool(
  253. name="google_search",
  254. description="A tool for performing a Google search and extracting snippets and webpages "
  255. "when you need to search for something you don't know or when your information "
  256. "is not up to date. "
  257. "Input should be a search query.",
  258. func=OptimizedSerpAPIWrapper(**func_kwargs).run,
  259. args_schema=OptimizedSerpAPIInput
  260. )
  261. return tool
  262. def to_current_datetime_tool(self, tool_config: dict,
  263. invoke_from: InvokeFrom) -> Optional[BaseTool]:
  264. """
  265. A tool for getting the current date and time
  266. :param tool_config: tool config
  267. :param invoke_from: invoke from
  268. :return:
  269. """
  270. return DatetimeTool()
  271. def to_wikipedia_tool(self, tool_config: dict,
  272. invoke_from: InvokeFrom) -> Optional[BaseTool]:
  273. """
  274. A tool for searching Wikipedia
  275. :param tool_config: tool config
  276. :param invoke_from: invoke from
  277. :return:
  278. """
  279. class WikipediaInput(BaseModel):
  280. query: str = Field(..., description="search query.")
  281. return WikipediaQueryRun(
  282. name="wikipedia",
  283. api_wrapper=WikipediaAPIWrapper(doc_content_chars_max=4000),
  284. args_schema=WikipediaInput
  285. )