dataset_retrieval.py 22 KB


  1. import threading
  2. from typing import Optional, cast
  3. from flask import Flask, current_app
  4. from core.app.app_config.entities import DatasetEntity, DatasetRetrieveConfigEntity
  5. from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
  6. from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
  7. from core.entities.agent_entities import PlanningStrategy
  8. from core.memory.token_buffer_memory import TokenBufferMemory
  9. from core.model_manager import ModelInstance, ModelManager
  10. from core.model_runtime.entities.message_entities import PromptMessageTool
  11. from core.model_runtime.entities.model_entities import ModelFeature, ModelType
  12. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  13. from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
  14. from core.ops.utils import measure_time
  15. from core.rag.datasource.retrieval_service import RetrievalService
  16. from core.rag.models.document import Document
  17. from core.rag.rerank.rerank import RerankRunner
  18. from core.rag.retrieval.retrival_methods import RetrievalMethod
  19. from core.rag.retrieval.router.multi_dataset_function_call_router import FunctionCallMultiDatasetRouter
  20. from core.rag.retrieval.router.multi_dataset_react_route import ReactMultiDatasetRouter
  21. from core.tools.tool.dataset_retriever.dataset_multi_retriever_tool import DatasetMultiRetrieverTool
  22. from core.tools.tool.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
  23. from core.tools.tool.dataset_retriever.dataset_retriever_tool import DatasetRetrieverTool
  24. from extensions.ext_database import db
  25. from models.dataset import Dataset, DatasetQuery, DocumentSegment
  26. from models.dataset import Document as DatasetDocument
  27. default_retrieval_model = {
  28. 'search_method': RetrievalMethod.SEMANTIC_SEARCH,
  29. 'reranking_enable': False,
  30. 'reranking_model': {
  31. 'reranking_provider_name': '',
  32. 'reranking_model_name': ''
  33. },
  34. 'top_k': 2,
  35. 'score_threshold_enabled': False
  36. }
  37. class DatasetRetrieval:
  38. def __init__(self, application_generate_entity=None):
  39. self.application_generate_entity = application_generate_entity
  40. def retrieve(
  41. self, app_id: str, user_id: str, tenant_id: str,
  42. model_config: ModelConfigWithCredentialsEntity,
  43. config: DatasetEntity,
  44. query: str,
  45. invoke_from: InvokeFrom,
  46. show_retrieve_source: bool,
  47. hit_callback: DatasetIndexToolCallbackHandler,
  48. message_id: str,
  49. memory: Optional[TokenBufferMemory] = None,
  50. ) -> Optional[str]:
  51. """
  52. Retrieve dataset.
  53. :param app_id: app_id
  54. :param user_id: user_id
  55. :param tenant_id: tenant id
  56. :param model_config: model config
  57. :param config: dataset config
  58. :param query: query
  59. :param invoke_from: invoke from
  60. :param show_retrieve_source: show retrieve source
  61. :param hit_callback: hit callback
  62. :param message_id: message id
  63. :param memory: memory
  64. :return:
  65. """
  66. dataset_ids = config.dataset_ids
  67. if len(dataset_ids) == 0:
  68. return None
  69. retrieve_config = config.retrieve_config
  70. # check model is support tool calling
  71. model_type_instance = model_config.provider_model_bundle.model_type_instance
  72. model_type_instance = cast(LargeLanguageModel, model_type_instance)
  73. model_manager = ModelManager()
  74. model_instance = model_manager.get_model_instance(
  75. tenant_id=tenant_id,
  76. model_type=ModelType.LLM,
  77. provider=model_config.provider,
  78. model=model_config.model
  79. )
  80. # get model schema
  81. model_schema = model_type_instance.get_model_schema(
  82. model=model_config.model,
  83. credentials=model_config.credentials
  84. )
  85. if not model_schema:
  86. return None
  87. planning_strategy = PlanningStrategy.REACT_ROUTER
  88. features = model_schema.features
  89. if features:
  90. if ModelFeature.TOOL_CALL in features \
  91. or ModelFeature.MULTI_TOOL_CALL in features:
  92. planning_strategy = PlanningStrategy.ROUTER
  93. available_datasets = []
  94. for dataset_id in dataset_ids:
  95. # get dataset from dataset id
  96. dataset = db.session.query(Dataset).filter(
  97. Dataset.tenant_id == tenant_id,
  98. Dataset.id == dataset_id
  99. ).first()
  100. # pass if dataset is not available
  101. if not dataset:
  102. continue
  103. # pass if dataset is not available
  104. if (dataset and dataset.available_document_count == 0
  105. and dataset.available_document_count == 0):
  106. continue
  107. available_datasets.append(dataset)
  108. all_documents = []
  109. user_from = 'account' if invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end_user'
  110. if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
  111. all_documents = self.single_retrieve(
  112. app_id, tenant_id, user_id, user_from, available_datasets, query,
  113. model_instance,
  114. model_config, planning_strategy, message_id
  115. )
  116. elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
  117. all_documents = self.multiple_retrieve(
  118. app_id, tenant_id, user_id, user_from,
  119. available_datasets, query, retrieve_config.top_k,
  120. retrieve_config.score_threshold,
  121. retrieve_config.reranking_model.get('reranking_provider_name'),
  122. retrieve_config.reranking_model.get('reranking_model_name'),
  123. message_id,
  124. )
  125. document_score_list = {}
  126. for item in all_documents:
  127. if item.metadata.get('score'):
  128. document_score_list[item.metadata['doc_id']] = item.metadata['score']
  129. document_context_list = []
  130. index_node_ids = [document.metadata['doc_id'] for document in all_documents]
  131. segments = DocumentSegment.query.filter(
  132. DocumentSegment.dataset_id.in_(dataset_ids),
  133. DocumentSegment.completed_at.isnot(None),
  134. DocumentSegment.status == 'completed',
  135. DocumentSegment.enabled == True,
  136. DocumentSegment.index_node_id.in_(index_node_ids)
  137. ).all()
  138. if segments:
  139. index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
  140. sorted_segments = sorted(segments,
  141. key=lambda segment: index_node_id_to_position.get(segment.index_node_id,
  142. float('inf')))
  143. for segment in sorted_segments:
  144. if segment.answer:
  145. document_context_list.append(f'question:{segment.get_sign_content()} answer:{segment.answer}')
  146. else:
  147. document_context_list.append(segment.get_sign_content())
  148. if show_retrieve_source:
  149. context_list = []
  150. resource_number = 1
  151. for segment in sorted_segments:
  152. dataset = Dataset.query.filter_by(
  153. id=segment.dataset_id
  154. ).first()
  155. document = DatasetDocument.query.filter(DatasetDocument.id == segment.document_id,
  156. DatasetDocument.enabled == True,
  157. DatasetDocument.archived == False,
  158. ).first()
  159. if dataset and document:
  160. source = {
  161. 'position': resource_number,
  162. 'dataset_id': dataset.id,
  163. 'dataset_name': dataset.name,
  164. 'document_id': document.id,
  165. 'document_name': document.name,
  166. 'data_source_type': document.data_source_type,
  167. 'segment_id': segment.id,
  168. 'retriever_from': invoke_from.to_source(),
  169. 'score': document_score_list.get(segment.index_node_id, None)
  170. }
  171. if invoke_from.to_source() == 'dev':
  172. source['hit_count'] = segment.hit_count
  173. source['word_count'] = segment.word_count
  174. source['segment_position'] = segment.position
  175. source['index_node_hash'] = segment.index_node_hash
  176. if segment.answer:
  177. source['content'] = f'question:{segment.content} \nanswer:{segment.answer}'
  178. else:
  179. source['content'] = segment.content
  180. context_list.append(source)
  181. resource_number += 1
  182. if hit_callback:
  183. hit_callback.return_retriever_resource_info(context_list)
  184. return str("\n".join(document_context_list))
  185. return ''
  186. def single_retrieve(
  187. self, app_id: str,
  188. tenant_id: str,
  189. user_id: str,
  190. user_from: str,
  191. available_datasets: list,
  192. query: str,
  193. model_instance: ModelInstance,
  194. model_config: ModelConfigWithCredentialsEntity,
  195. planning_strategy: PlanningStrategy,
  196. message_id: Optional[str] = None,
  197. ):
  198. tools = []
  199. for dataset in available_datasets:
  200. description = dataset.description
  201. if not description:
  202. description = 'useful for when you want to answer queries about the ' + dataset.name
  203. description = description.replace('\n', '').replace('\r', '')
  204. message_tool = PromptMessageTool(
  205. name=dataset.id,
  206. description=description,
  207. parameters={
  208. "type": "object",
  209. "properties": {},
  210. "required": [],
  211. }
  212. )
  213. tools.append(message_tool)
  214. dataset_id = None
  215. if planning_strategy == PlanningStrategy.REACT_ROUTER:
  216. react_multi_dataset_router = ReactMultiDatasetRouter()
  217. dataset_id = react_multi_dataset_router.invoke(query, tools, model_config, model_instance,
  218. user_id, tenant_id)
  219. elif planning_strategy == PlanningStrategy.ROUTER:
  220. function_call_router = FunctionCallMultiDatasetRouter()
  221. dataset_id = function_call_router.invoke(query, tools, model_config, model_instance)
  222. if dataset_id:
  223. # get retrieval model config
  224. dataset = db.session.query(Dataset).filter(
  225. Dataset.id == dataset_id
  226. ).first()
  227. if dataset:
  228. retrieval_model_config = dataset.retrieval_model \
  229. if dataset.retrieval_model else default_retrieval_model
  230. # get top k
  231. top_k = retrieval_model_config['top_k']
  232. # get retrieval method
  233. if dataset.indexing_technique == "economy":
  234. retrival_method = 'keyword_search'
  235. else:
  236. retrival_method = retrieval_model_config['search_method']
  237. # get reranking model
  238. reranking_model = retrieval_model_config['reranking_model'] \
  239. if retrieval_model_config['reranking_enable'] else None
  240. # get score threshold
  241. score_threshold = .0
  242. score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
  243. if score_threshold_enabled:
  244. score_threshold = retrieval_model_config.get("score_threshold")
  245. with measure_time() as timer:
  246. results = RetrievalService.retrieve(
  247. retrival_method=retrival_method, dataset_id=dataset.id,
  248. query=query,
  249. top_k=top_k, score_threshold=score_threshold,
  250. reranking_model=reranking_model
  251. )
  252. self._on_query(query, [dataset_id], app_id, user_from, user_id)
  253. if results:
  254. self._on_retrival_end(results, message_id, timer)
  255. return results
  256. return []
  257. def multiple_retrieve(
  258. self,
  259. app_id: str,
  260. tenant_id: str,
  261. user_id: str,
  262. user_from: str,
  263. available_datasets: list,
  264. query: str,
  265. top_k: int,
  266. score_threshold: float,
  267. reranking_provider_name: str,
  268. reranking_model_name: str,
  269. message_id: Optional[str] = None,
  270. ):
  271. threads = []
  272. all_documents = []
  273. dataset_ids = [dataset.id for dataset in available_datasets]
  274. for dataset in available_datasets:
  275. retrieval_thread = threading.Thread(target=self._retriever, kwargs={
  276. 'flask_app': current_app._get_current_object(),
  277. 'dataset_id': dataset.id,
  278. 'query': query,
  279. 'top_k': top_k,
  280. 'all_documents': all_documents,
  281. })
  282. threads.append(retrieval_thread)
  283. retrieval_thread.start()
  284. for thread in threads:
  285. thread.join()
  286. # do rerank for searched documents
  287. model_manager = ModelManager()
  288. rerank_model_instance = model_manager.get_model_instance(
  289. tenant_id=tenant_id,
  290. provider=reranking_provider_name,
  291. model_type=ModelType.RERANK,
  292. model=reranking_model_name
  293. )
  294. rerank_runner = RerankRunner(rerank_model_instance)
  295. with measure_time() as timer:
  296. all_documents = rerank_runner.run(
  297. query, all_documents,
  298. score_threshold,
  299. top_k
  300. )
  301. self._on_query(query, dataset_ids, app_id, user_from, user_id)
  302. if all_documents:
  303. self._on_retrival_end(all_documents, message_id, timer)
  304. return all_documents
  305. def _on_retrival_end(
  306. self, documents: list[Document], message_id: Optional[str] = None, timer: Optional[dict] = None
  307. ) -> None:
  308. """Handle retrival end."""
  309. for document in documents:
  310. query = db.session.query(DocumentSegment).filter(
  311. DocumentSegment.index_node_id == document.metadata['doc_id']
  312. )
  313. # if 'dataset_id' in document.metadata:
  314. if 'dataset_id' in document.metadata:
  315. query = query.filter(DocumentSegment.dataset_id == document.metadata['dataset_id'])
  316. # add hit count to document segment
  317. query.update(
  318. {DocumentSegment.hit_count: DocumentSegment.hit_count + 1},
  319. synchronize_session=False
  320. )
  321. db.session.commit()
  322. # get tracing instance
  323. trace_manager: TraceQueueManager = self.application_generate_entity.trace_manager if self.application_generate_entity else None
  324. if trace_manager:
  325. trace_manager.add_trace_task(
  326. TraceTask(
  327. TraceTaskName.DATASET_RETRIEVAL_TRACE,
  328. message_id=message_id,
  329. documents=documents,
  330. timer=timer
  331. )
  332. )
  333. def _on_query(self, query: str, dataset_ids: list[str], app_id: str, user_from: str, user_id: str) -> None:
  334. """
  335. Handle query.
  336. """
  337. if not query:
  338. return
  339. dataset_queries = []
  340. for dataset_id in dataset_ids:
  341. dataset_query = DatasetQuery(
  342. dataset_id=dataset_id,
  343. content=query,
  344. source='app',
  345. source_app_id=app_id,
  346. created_by_role=user_from,
  347. created_by=user_id
  348. )
  349. dataset_queries.append(dataset_query)
  350. if dataset_queries:
  351. db.session.add_all(dataset_queries)
  352. db.session.commit()
  353. def _retriever(self, flask_app: Flask, dataset_id: str, query: str, top_k: int, all_documents: list):
  354. with flask_app.app_context():
  355. dataset = db.session.query(Dataset).filter(
  356. Dataset.id == dataset_id
  357. ).first()
  358. if not dataset:
  359. return []
  360. # get retrieval model , if the model is not setting , using default
  361. retrieval_model = dataset.retrieval_model if dataset.retrieval_model else default_retrieval_model
  362. if dataset.indexing_technique == "economy":
  363. # use keyword table query
  364. documents = RetrievalService.retrieve(retrival_method='keyword_search',
  365. dataset_id=dataset.id,
  366. query=query,
  367. top_k=top_k
  368. )
  369. if documents:
  370. all_documents.extend(documents)
  371. else:
  372. if top_k > 0:
  373. # retrieval source
  374. documents = RetrievalService.retrieve(retrival_method=retrieval_model['search_method'],
  375. dataset_id=dataset.id,
  376. query=query,
  377. top_k=top_k,
  378. score_threshold=retrieval_model['score_threshold']
  379. if retrieval_model['score_threshold_enabled'] else None,
  380. reranking_model=retrieval_model['reranking_model']
  381. if retrieval_model['reranking_enable'] else None
  382. )
  383. all_documents.extend(documents)
  384. def to_dataset_retriever_tool(self, tenant_id: str,
  385. dataset_ids: list[str],
  386. retrieve_config: DatasetRetrieveConfigEntity,
  387. return_resource: bool,
  388. invoke_from: InvokeFrom,
  389. hit_callback: DatasetIndexToolCallbackHandler) \
  390. -> Optional[list[DatasetRetrieverBaseTool]]:
  391. """
  392. A dataset tool is a tool that can be used to retrieve information from a dataset
  393. :param tenant_id: tenant id
  394. :param dataset_ids: dataset ids
  395. :param retrieve_config: retrieve config
  396. :param return_resource: return resource
  397. :param invoke_from: invoke from
  398. :param hit_callback: hit callback
  399. """
  400. tools = []
  401. available_datasets = []
  402. for dataset_id in dataset_ids:
  403. # get dataset from dataset id
  404. dataset = db.session.query(Dataset).filter(
  405. Dataset.tenant_id == tenant_id,
  406. Dataset.id == dataset_id
  407. ).first()
  408. # pass if dataset is not available
  409. if not dataset:
  410. continue
  411. # pass if dataset is not available
  412. if (dataset and dataset.available_document_count == 0
  413. and dataset.available_document_count == 0):
  414. continue
  415. available_datasets.append(dataset)
  416. if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
  417. # get retrieval model config
  418. default_retrieval_model = {
  419. 'search_method': RetrievalMethod.SEMANTIC_SEARCH,
  420. 'reranking_enable': False,
  421. 'reranking_model': {
  422. 'reranking_provider_name': '',
  423. 'reranking_model_name': ''
  424. },
  425. 'top_k': 2,
  426. 'score_threshold_enabled': False
  427. }
  428. for dataset in available_datasets:
  429. retrieval_model_config = dataset.retrieval_model \
  430. if dataset.retrieval_model else default_retrieval_model
  431. # get top k
  432. top_k = retrieval_model_config['top_k']
  433. # get score threshold
  434. score_threshold = None
  435. score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
  436. if score_threshold_enabled:
  437. score_threshold = retrieval_model_config.get("score_threshold")
  438. tool = DatasetRetrieverTool.from_dataset(
  439. dataset=dataset,
  440. top_k=top_k,
  441. score_threshold=score_threshold,
  442. hit_callbacks=[hit_callback],
  443. return_resource=return_resource,
  444. retriever_from=invoke_from.to_source()
  445. )
  446. tools.append(tool)
  447. elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
  448. tool = DatasetMultiRetrieverTool.from_dataset(
  449. dataset_ids=[dataset.id for dataset in available_datasets],
  450. tenant_id=tenant_id,
  451. top_k=retrieve_config.top_k or 2,
  452. score_threshold=retrieve_config.score_threshold,
  453. hit_callbacks=[hit_callback],
  454. return_resource=return_resource,
  455. retriever_from=invoke_from.to_source(),
  456. reranking_provider_name=retrieve_config.reranking_model.get('reranking_provider_name'),
  457. reranking_model_name=retrieve_config.reranking_model.get('reranking_model_name')
  458. )
  459. tools.append(tool)
  460. return tools