dataset_retrieval.py 21 KB


  1. import threading
  2. from typing import Optional, cast
  3. from flask import Flask, current_app
  4. from core.app.app_config.entities import DatasetEntity, DatasetRetrieveConfigEntity
  5. from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
  6. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  7. from core.entities.agent_entities import PlanningStrategy
  8. from core.memory.token_buffer_memory import TokenBufferMemory
  9. from core.model_manager import ModelInstance, ModelManager
  10. from core.model_runtime.entities.message_entities import PromptMessageTool
  11. from core.model_runtime.entities.model_entities import ModelFeature, ModelType
  12. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  13. from core.rag.datasource.retrieval_service import RetrievalService
  14. from core.rag.models.document import Document
  15. from core.rag.retrieval.router.multi_dataset_function_call_router import FunctionCallMultiDatasetRouter
  16. from core.rag.retrieval.router.multi_dataset_react_route import ReactMultiDatasetRouter
  17. from core.rerank.rerank import RerankRunner
  18. from core.tools.tool.dataset_retriever.dataset_multi_retriever_tool import DatasetMultiRetrieverTool
  19. from core.tools.tool.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
  20. from core.tools.tool.dataset_retriever.dataset_retriever_tool import DatasetRetrieverTool
  21. from extensions.ext_database import db
  22. from models.dataset import Dataset, DatasetQuery, DocumentSegment
  23. from models.dataset import Document as DatasetDocument
  24. default_retrieval_model = {
  25. 'search_method': 'semantic_search',
  26. 'reranking_enable': False,
  27. 'reranking_model': {
  28. 'reranking_provider_name': '',
  29. 'reranking_model_name': ''
  30. },
  31. 'top_k': 2,
  32. 'score_threshold_enabled': False
  33. }
  34. class DatasetRetrieval:
  35. def retrieve(self, app_id: str, user_id: str, tenant_id: str,
  36. model_config: ModelConfigWithCredentialsEntity,
  37. config: DatasetEntity,
  38. query: str,
  39. invoke_from: InvokeFrom,
  40. show_retrieve_source: bool,
  41. hit_callback: DatasetIndexToolCallbackHandler,
  42. memory: Optional[TokenBufferMemory] = None) -> Optional[str]:
  43. """
  44. Retrieve dataset.
  45. :param app_id: app_id
  46. :param user_id: user_id
  47. :param tenant_id: tenant id
  48. :param model_config: model config
  49. :param config: dataset config
  50. :param query: query
  51. :param invoke_from: invoke from
  52. :param show_retrieve_source: show retrieve source
  53. :param hit_callback: hit callback
  54. :param memory: memory
  55. :return:
  56. """
  57. dataset_ids = config.dataset_ids
  58. if len(dataset_ids) == 0:
  59. return None
  60. retrieve_config = config.retrieve_config
  61. # check model is support tool calling
  62. model_type_instance = model_config.provider_model_bundle.model_type_instance
  63. model_type_instance = cast(LargeLanguageModel, model_type_instance)
  64. model_manager = ModelManager()
  65. model_instance = model_manager.get_model_instance(
  66. tenant_id=tenant_id,
  67. model_type=ModelType.LLM,
  68. provider=model_config.provider,
  69. model=model_config.model
  70. )
  71. # get model schema
  72. model_schema = model_type_instance.get_model_schema(
  73. model=model_config.model,
  74. credentials=model_config.credentials
  75. )
  76. if not model_schema:
  77. return None
  78. planning_strategy = PlanningStrategy.REACT_ROUTER
  79. features = model_schema.features
  80. if features:
  81. if ModelFeature.TOOL_CALL in features \
  82. or ModelFeature.MULTI_TOOL_CALL in features:
  83. planning_strategy = PlanningStrategy.ROUTER
  84. available_datasets = []
  85. for dataset_id in dataset_ids:
  86. # get dataset from dataset id
  87. dataset = db.session.query(Dataset).filter(
  88. Dataset.tenant_id == tenant_id,
  89. Dataset.id == dataset_id
  90. ).first()
  91. # pass if dataset is not available
  92. if not dataset:
  93. continue
  94. # pass if dataset is not available
  95. if (dataset and dataset.available_document_count == 0
  96. and dataset.available_document_count == 0):
  97. continue
  98. available_datasets.append(dataset)
  99. all_documents = []
  100. user_from = 'account' if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end_user'
  101. if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
  102. all_documents = self.single_retrieve(app_id, tenant_id, user_id, user_from, available_datasets, query,
  103. model_instance,
  104. model_config, planning_strategy)
  105. elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
  106. all_documents = self.multiple_retrieve(app_id, tenant_id, user_id, user_from,
  107. available_datasets, query, retrieve_config.top_k,
  108. retrieve_config.score_threshold,
  109. retrieve_config.reranking_model.get('reranking_provider_name'),
  110. retrieve_config.reranking_model.get('reranking_model_name'))
  111. document_score_list = {}
  112. for item in all_documents:
  113. if 'score' in item.metadata and item.metadata['score']:
  114. document_score_list[item.metadata['doc_id']] = item.metadata['score']
  115. document_context_list = []
  116. index_node_ids = [document.metadata['doc_id'] for document in all_documents]
  117. segments = DocumentSegment.query.filter(
  118. DocumentSegment.dataset_id.in_(dataset_ids),
  119. DocumentSegment.completed_at.isnot(None),
  120. DocumentSegment.status == 'completed',
  121. DocumentSegment.enabled == True,
  122. DocumentSegment.index_node_id.in_(index_node_ids)
  123. ).all()
  124. if segments:
  125. index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
  126. sorted_segments = sorted(segments,
  127. key=lambda segment: index_node_id_to_position.get(segment.index_node_id,
  128. float('inf')))
  129. for segment in sorted_segments:
  130. if segment.answer:
  131. document_context_list.append(f'question:{segment.content} answer:{segment.answer}')
  132. else:
  133. document_context_list.append(segment.content)
  134. if show_retrieve_source:
  135. context_list = []
  136. resource_number = 1
  137. for segment in sorted_segments:
  138. dataset = Dataset.query.filter_by(
  139. id=segment.dataset_id
  140. ).first()
  141. document = DatasetDocument.query.filter(DatasetDocument.id == segment.document_id,
  142. DatasetDocument.enabled == True,
  143. DatasetDocument.archived == False,
  144. ).first()
  145. if dataset and document:
  146. source = {
  147. 'position': resource_number,
  148. 'dataset_id': dataset.id,
  149. 'dataset_name': dataset.name,
  150. 'document_id': document.id,
  151. 'document_name': document.name,
  152. 'data_source_type': document.data_source_type,
  153. 'segment_id': segment.id,
  154. 'retriever_from': invoke_from.to_source(),
  155. 'score': document_score_list.get(segment.index_node_id, None)
  156. }
  157. if invoke_from.to_source() == 'dev':
  158. source['hit_count'] = segment.hit_count
  159. source['word_count'] = segment.word_count
  160. source['segment_position'] = segment.position
  161. source['index_node_hash'] = segment.index_node_hash
  162. if segment.answer:
  163. source['content'] = f'question:{segment.content} \nanswer:{segment.answer}'
  164. else:
  165. source['content'] = segment.content
  166. context_list.append(source)
  167. resource_number += 1
  168. if hit_callback:
  169. hit_callback.return_retriever_resource_info(context_list)
  170. return str("\n".join(document_context_list))
  171. return ''
  172. def single_retrieve(self, app_id: str,
  173. tenant_id: str,
  174. user_id: str,
  175. user_from: str,
  176. available_datasets: list,
  177. query: str,
  178. model_instance: ModelInstance,
  179. model_config: ModelConfigWithCredentialsEntity,
  180. planning_strategy: PlanningStrategy,
  181. ):
  182. tools = []
  183. for dataset in available_datasets:
  184. description = dataset.description
  185. if not description:
  186. description = 'useful for when you want to answer queries about the ' + dataset.name
  187. description = description.replace('\n', '').replace('\r', '')
  188. message_tool = PromptMessageTool(
  189. name=dataset.id,
  190. description=description,
  191. parameters={
  192. "type": "object",
  193. "properties": {},
  194. "required": [],
  195. }
  196. )
  197. tools.append(message_tool)
  198. dataset_id = None
  199. if planning_strategy == PlanningStrategy.REACT_ROUTER:
  200. react_multi_dataset_router = ReactMultiDatasetRouter()
  201. dataset_id = react_multi_dataset_router.invoke(query, tools, model_config, model_instance,
  202. user_id, tenant_id)
  203. elif planning_strategy == PlanningStrategy.ROUTER:
  204. function_call_router = FunctionCallMultiDatasetRouter()
  205. dataset_id = function_call_router.invoke(query, tools, model_config, model_instance)
  206. if dataset_id:
  207. # get retrieval model config
  208. dataset = db.session.query(Dataset).filter(
  209. Dataset.id == dataset_id
  210. ).first()
  211. if dataset:
  212. retrieval_model_config = dataset.retrieval_model \
  213. if dataset.retrieval_model else default_retrieval_model
  214. # get top k
  215. top_k = retrieval_model_config['top_k']
  216. # get retrieval method
  217. if dataset.indexing_technique == "economy":
  218. retrival_method = 'keyword_search'
  219. else:
  220. retrival_method = retrieval_model_config['search_method']
  221. # get reranking model
  222. reranking_model = retrieval_model_config['reranking_model'] \
  223. if retrieval_model_config['reranking_enable'] else None
  224. # get score threshold
  225. score_threshold = .0
  226. score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
  227. if score_threshold_enabled:
  228. score_threshold = retrieval_model_config.get("score_threshold")
  229. results = RetrievalService.retrieve(retrival_method=retrival_method, dataset_id=dataset.id,
  230. query=query,
  231. top_k=top_k, score_threshold=score_threshold,
  232. reranking_model=reranking_model)
  233. self._on_query(query, [dataset_id], app_id, user_from, user_id)
  234. if results:
  235. self._on_retrival_end(results)
  236. return results
  237. return []
  238. def multiple_retrieve(self,
  239. app_id: str,
  240. tenant_id: str,
  241. user_id: str,
  242. user_from: str,
  243. available_datasets: list,
  244. query: str,
  245. top_k: int,
  246. score_threshold: float,
  247. reranking_provider_name: str,
  248. reranking_model_name: str):
  249. threads = []
  250. all_documents = []
  251. dataset_ids = [dataset.id for dataset in available_datasets]
  252. for dataset in available_datasets:
  253. retrieval_thread = threading.Thread(target=self._retriever, kwargs={
  254. 'flask_app': current_app._get_current_object(),
  255. 'dataset_id': dataset.id,
  256. 'query': query,
  257. 'top_k': top_k,
  258. 'all_documents': all_documents,
  259. })
  260. threads.append(retrieval_thread)
  261. retrieval_thread.start()
  262. for thread in threads:
  263. thread.join()
  264. # do rerank for searched documents
  265. model_manager = ModelManager()
  266. rerank_model_instance = model_manager.get_model_instance(
  267. tenant_id=tenant_id,
  268. provider=reranking_provider_name,
  269. model_type=ModelType.RERANK,
  270. model=reranking_model_name
  271. )
  272. rerank_runner = RerankRunner(rerank_model_instance)
  273. all_documents = rerank_runner.run(query, all_documents,
  274. score_threshold,
  275. top_k)
  276. self._on_query(query, dataset_ids, app_id, user_from, user_id)
  277. if all_documents:
  278. self._on_retrival_end(all_documents)
  279. return all_documents
  280. def _on_retrival_end(self, documents: list[Document]) -> None:
  281. """Handle retrival end."""
  282. for document in documents:
  283. query = db.session.query(DocumentSegment).filter(
  284. DocumentSegment.index_node_id == document.metadata['doc_id']
  285. )
  286. # if 'dataset_id' in document.metadata:
  287. if 'dataset_id' in document.metadata:
  288. query = query.filter(DocumentSegment.dataset_id == document.metadata['dataset_id'])
  289. # add hit count to document segment
  290. query.update(
  291. {DocumentSegment.hit_count: DocumentSegment.hit_count + 1},
  292. synchronize_session=False
  293. )
  294. db.session.commit()
  295. def _on_query(self, query: str, dataset_ids: list[str], app_id: str, user_from: str, user_id: str) -> None:
  296. """
  297. Handle query.
  298. """
  299. if not query:
  300. return
  301. for dataset_id in dataset_ids:
  302. dataset_query = DatasetQuery(
  303. dataset_id=dataset_id,
  304. content=query,
  305. source='app',
  306. source_app_id=app_id,
  307. created_by_role=user_from,
  308. created_by=user_id
  309. )
  310. db.session.add(dataset_query)
  311. db.session.commit()
  312. def _retriever(self, flask_app: Flask, dataset_id: str, query: str, top_k: int, all_documents: list):
  313. with flask_app.app_context():
  314. dataset = db.session.query(Dataset).filter(
  315. Dataset.id == dataset_id
  316. ).first()
  317. if not dataset:
  318. return []
  319. # get retrieval model , if the model is not setting , using default
  320. retrieval_model = dataset.retrieval_model if dataset.retrieval_model else default_retrieval_model
  321. if dataset.indexing_technique == "economy":
  322. # use keyword table query
  323. documents = RetrievalService.retrieve(retrival_method='keyword_search',
  324. dataset_id=dataset.id,
  325. query=query,
  326. top_k=top_k
  327. )
  328. if documents:
  329. all_documents.extend(documents)
  330. else:
  331. if top_k > 0:
  332. # retrieval source
  333. documents = RetrievalService.retrieve(retrival_method=retrieval_model['search_method'],
  334. dataset_id=dataset.id,
  335. query=query,
  336. top_k=top_k,
  337. score_threshold=retrieval_model['score_threshold']
  338. if retrieval_model['score_threshold_enabled'] else None,
  339. reranking_model=retrieval_model['reranking_model']
  340. if retrieval_model['reranking_enable'] else None
  341. )
  342. all_documents.extend(documents)
  343. def to_dataset_retriever_tool(self, tenant_id: str,
  344. dataset_ids: list[str],
  345. retrieve_config: DatasetRetrieveConfigEntity,
  346. return_resource: bool,
  347. invoke_from: InvokeFrom,
  348. hit_callback: DatasetIndexToolCallbackHandler) \
  349. -> Optional[list[DatasetRetrieverBaseTool]]:
  350. """
  351. A dataset tool is a tool that can be used to retrieve information from a dataset
  352. :param tenant_id: tenant id
  353. :param dataset_ids: dataset ids
  354. :param retrieve_config: retrieve config
  355. :param return_resource: return resource
  356. :param invoke_from: invoke from
  357. :param hit_callback: hit callback
  358. """
  359. tools = []
  360. available_datasets = []
  361. for dataset_id in dataset_ids:
  362. # get dataset from dataset id
  363. dataset = db.session.query(Dataset).filter(
  364. Dataset.tenant_id == tenant_id,
  365. Dataset.id == dataset_id
  366. ).first()
  367. # pass if dataset is not available
  368. if not dataset:
  369. continue
  370. # pass if dataset is not available
  371. if (dataset and dataset.available_document_count == 0
  372. and dataset.available_document_count == 0):
  373. continue
  374. available_datasets.append(dataset)
  375. if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
  376. # get retrieval model config
  377. default_retrieval_model = {
  378. 'search_method': 'semantic_search',
  379. 'reranking_enable': False,
  380. 'reranking_model': {
  381. 'reranking_provider_name': '',
  382. 'reranking_model_name': ''
  383. },
  384. 'top_k': 2,
  385. 'score_threshold_enabled': False
  386. }
  387. for dataset in available_datasets:
  388. retrieval_model_config = dataset.retrieval_model \
  389. if dataset.retrieval_model else default_retrieval_model
  390. # get top k
  391. top_k = retrieval_model_config['top_k']
  392. # get score threshold
  393. score_threshold = None
  394. score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
  395. if score_threshold_enabled:
  396. score_threshold = retrieval_model_config.get("score_threshold")
  397. tool = DatasetRetrieverTool.from_dataset(
  398. dataset=dataset,
  399. top_k=top_k,
  400. score_threshold=score_threshold,
  401. hit_callbacks=[hit_callback],
  402. return_resource=return_resource,
  403. retriever_from=invoke_from.to_source()
  404. )
  405. tools.append(tool)
  406. elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
  407. tool = DatasetMultiRetrieverTool.from_dataset(
  408. dataset_ids=[dataset.id for dataset in available_datasets],
  409. tenant_id=tenant_id,
  410. top_k=retrieve_config.top_k or 2,
  411. score_threshold=retrieve_config.score_threshold,
  412. hit_callbacks=[hit_callback],
  413. return_resource=return_resource,
  414. retriever_from=invoke_from.to_source(),
  415. reranking_provider_name=retrieve_config.reranking_model.get('reranking_provider_name'),
  416. reranking_model_name=retrieve_config.reranking_model.get('reranking_model_name')
  417. )
  418. tools.append(tool)
  419. return tools