deal_dataset_vector_index_task.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.rag.models.document import Document
  7. from extensions.ext_database import db
  8. from models.dataset import Dataset, DocumentSegment
  9. from models.dataset import Document as DatasetDocument
  10. @shared_task(queue='dataset')
  11. def deal_dataset_vector_index_task(dataset_id: str, action: str):
  12. """
  13. Async deal dataset from index
  14. :param dataset_id: dataset_id
  15. :param action: action
  16. Usage: deal_dataset_vector_index_task.delay(dataset_id, action)
  17. """
  18. logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green'))
  19. start_at = time.perf_counter()
  20. try:
  21. dataset = Dataset.query.filter_by(
  22. id=dataset_id
  23. ).first()
  24. if not dataset:
  25. raise Exception('Dataset not found')
  26. index_type = dataset.doc_form
  27. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  28. if action == "remove":
  29. index_processor.clean(dataset, None, with_keywords=False)
  30. elif action == "add":
  31. dataset_documents = db.session.query(DatasetDocument).filter(
  32. DatasetDocument.dataset_id == dataset_id,
  33. DatasetDocument.indexing_status == 'completed',
  34. DatasetDocument.enabled == True,
  35. DatasetDocument.archived == False,
  36. ).all()
  37. if dataset_documents:
  38. dataset_documents_ids = [doc.id for doc in dataset_documents]
  39. db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \
  40. .update({"indexing_status": "indexing"}, synchronize_session=False)
  41. db.session.commit()
  42. for dataset_document in dataset_documents:
  43. try:
  44. # add from vector index
  45. segments = db.session.query(DocumentSegment).filter(
  46. DocumentSegment.document_id == dataset_document.id,
  47. DocumentSegment.enabled == True
  48. ) .order_by(DocumentSegment.position.asc()).all()
  49. if segments:
  50. documents = []
  51. for segment in segments:
  52. document = Document(
  53. page_content=segment.content,
  54. metadata={
  55. "doc_id": segment.index_node_id,
  56. "doc_hash": segment.index_node_hash,
  57. "document_id": segment.document_id,
  58. "dataset_id": segment.dataset_id,
  59. }
  60. )
  61. documents.append(document)
  62. # save vector index
  63. index_processor.load(dataset, documents, with_keywords=False)
  64. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
  65. .update({"indexing_status": "completed"}, synchronize_session=False)
  66. db.session.commit()
  67. except Exception as e:
  68. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
  69. .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False)
  70. db.session.commit()
  71. elif action == 'update':
  72. dataset_documents = db.session.query(DatasetDocument).filter(
  73. DatasetDocument.dataset_id == dataset_id,
  74. DatasetDocument.indexing_status == 'completed',
  75. DatasetDocument.enabled == True,
  76. DatasetDocument.archived == False,
  77. ).all()
  78. # add new index
  79. if dataset_documents:
  80. # update document status
  81. dataset_documents_ids = [doc.id for doc in dataset_documents]
  82. db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \
  83. .update({"indexing_status": "indexing"}, synchronize_session=False)
  84. db.session.commit()
  85. # clean index
  86. index_processor.clean(dataset, None, with_keywords=False)
  87. for dataset_document in dataset_documents:
  88. # update from vector index
  89. try:
  90. segments = db.session.query(DocumentSegment).filter(
  91. DocumentSegment.document_id == dataset_document.id,
  92. DocumentSegment.enabled == True
  93. ).order_by(DocumentSegment.position.asc()).all()
  94. if segments:
  95. documents = []
  96. for segment in segments:
  97. document = Document(
  98. page_content=segment.content,
  99. metadata={
  100. "doc_id": segment.index_node_id,
  101. "doc_hash": segment.index_node_hash,
  102. "document_id": segment.document_id,
  103. "dataset_id": segment.dataset_id,
  104. }
  105. )
  106. documents.append(document)
  107. # save vector index
  108. index_processor.load(dataset, documents, with_keywords=False)
  109. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
  110. .update({"indexing_status": "completed"}, synchronize_session=False)
  111. db.session.commit()
  112. except Exception as e:
  113. db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \
  114. .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False)
  115. db.session.commit()
  116. end_at = time.perf_counter()
  117. logging.info(
  118. click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
  119. except Exception:
  120. logging.exception("Deal dataset vector index failed")