deal_dataset_vector_index_task.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.rag.models.document import Document
  7. from extensions.ext_database import db
  8. from models.dataset import Dataset, DocumentSegment
  9. from models.dataset import Document as DatasetDocument
  10. @shared_task(queue="dataset")
  11. def deal_dataset_vector_index_task(dataset_id: str, action: str):
  12. """
  13. Async deal dataset from index
  14. :param dataset_id: dataset_id
  15. :param action: action
  16. Usage: deal_dataset_vector_index_task.delay(dataset_id, action)
  17. """
  18. logging.info(click.style("Start deal dataset vector index: {}".format(dataset_id), fg="green"))
  19. start_at = time.perf_counter()
  20. try:
  21. dataset = Dataset.query.filter_by(id=dataset_id).first()
  22. if not dataset:
  23. raise Exception("Dataset not found")
  24. index_type = dataset.doc_form
  25. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  26. if action == "remove":
  27. index_processor.clean(dataset, None, with_keywords=False)
  28. elif action == "add":
  29. dataset_documents = (
  30. db.session.query(DatasetDocument)
  31. .filter(
  32. DatasetDocument.dataset_id == dataset_id,
  33. DatasetDocument.indexing_status == "completed",
  34. DatasetDocument.enabled == True,
  35. DatasetDocument.archived == False,
  36. )
  37. .all()
  38. )
  39. if dataset_documents:
  40. dataset_documents_ids = [doc.id for doc in dataset_documents]
  41. db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update(
  42. {"indexing_status": "indexing"}, synchronize_session=False
  43. )
  44. db.session.commit()
  45. for dataset_document in dataset_documents:
  46. try:
  47. # add from vector index
  48. segments = (
  49. db.session.query(DocumentSegment)
  50. .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True)
  51. .order_by(DocumentSegment.position.asc())
  52. .all()
  53. )
  54. if segments:
  55. documents = []
  56. for segment in segments:
  57. document = Document(
  58. page_content=segment.content,
  59. metadata={
  60. "doc_id": segment.index_node_id,
  61. "doc_hash": segment.index_node_hash,
  62. "document_id": segment.document_id,
  63. "dataset_id": segment.dataset_id,
  64. },
  65. )
  66. documents.append(document)
  67. # save vector index
  68. index_processor.load(dataset, documents, with_keywords=False)
  69. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update(
  70. {"indexing_status": "completed"}, synchronize_session=False
  71. )
  72. db.session.commit()
  73. except Exception as e:
  74. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update(
  75. {"indexing_status": "error", "error": str(e)}, synchronize_session=False
  76. )
  77. db.session.commit()
  78. elif action == "update":
  79. dataset_documents = (
  80. db.session.query(DatasetDocument)
  81. .filter(
  82. DatasetDocument.dataset_id == dataset_id,
  83. DatasetDocument.indexing_status == "completed",
  84. DatasetDocument.enabled == True,
  85. DatasetDocument.archived == False,
  86. )
  87. .all()
  88. )
  89. # add new index
  90. if dataset_documents:
  91. # update document status
  92. dataset_documents_ids = [doc.id for doc in dataset_documents]
  93. db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update(
  94. {"indexing_status": "indexing"}, synchronize_session=False
  95. )
  96. db.session.commit()
  97. # clean index
  98. index_processor.clean(dataset, None, with_keywords=False)
  99. for dataset_document in dataset_documents:
  100. # update from vector index
  101. try:
  102. segments = (
  103. db.session.query(DocumentSegment)
  104. .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True)
  105. .order_by(DocumentSegment.position.asc())
  106. .all()
  107. )
  108. if segments:
  109. documents = []
  110. for segment in segments:
  111. document = Document(
  112. page_content=segment.content,
  113. metadata={
  114. "doc_id": segment.index_node_id,
  115. "doc_hash": segment.index_node_hash,
  116. "document_id": segment.document_id,
  117. "dataset_id": segment.dataset_id,
  118. },
  119. )
  120. documents.append(document)
  121. # save vector index
  122. index_processor.load(dataset, documents, with_keywords=False)
  123. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update(
  124. {"indexing_status": "completed"}, synchronize_session=False
  125. )
  126. db.session.commit()
  127. except Exception as e:
  128. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update(
  129. {"indexing_status": "error", "error": str(e)}, synchronize_session=False
  130. )
  131. db.session.commit()
  132. end_at = time.perf_counter()
  133. logging.info(
  134. click.style("Deal dataset vector index: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")
  135. )
  136. except Exception:
  137. logging.exception("Deal dataset vector index failed")