deal_dataset_vector_index_task.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.rag.models.document import Document
  7. from extensions.ext_database import db
  8. from models.dataset import Dataset, DocumentSegment
  9. from models.dataset import Document as DatasetDocument
  10. @shared_task(queue='dataset')
  11. def deal_dataset_vector_index_task(dataset_id: str, action: str):
  12. """
  13. Async deal dataset from index
  14. :param dataset_id: dataset_id
  15. :param action: action
  16. Usage: deal_dataset_vector_index_task.delay(dataset_id, action)
  17. """
  18. logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green'))
  19. start_at = time.perf_counter()
  20. try:
  21. dataset = Dataset.query.filter_by(
  22. id=dataset_id
  23. ).first()
  24. if not dataset:
  25. raise Exception('Dataset not found')
  26. index_type = dataset.doc_form
  27. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  28. if action == "remove":
  29. index_processor.clean(dataset, None, with_keywords=False)
  30. elif action == "add":
  31. dataset_documents = db.session.query(DatasetDocument).filter(
  32. DatasetDocument.dataset_id == dataset_id,
  33. DatasetDocument.indexing_status == 'completed',
  34. DatasetDocument.enabled == True,
  35. DatasetDocument.archived == False,
  36. ).all()
  37. if dataset_documents:
  38. documents = []
  39. for dataset_document in dataset_documents:
  40. # delete from vector index
  41. segments = db.session.query(DocumentSegment).filter(
  42. DocumentSegment.document_id == dataset_document.id,
  43. DocumentSegment.enabled == True
  44. ) .order_by(DocumentSegment.position.asc()).all()
  45. for segment in segments:
  46. document = Document(
  47. page_content=segment.content,
  48. metadata={
  49. "doc_id": segment.index_node_id,
  50. "doc_hash": segment.index_node_hash,
  51. "document_id": segment.document_id,
  52. "dataset_id": segment.dataset_id,
  53. }
  54. )
  55. documents.append(document)
  56. # save vector index
  57. index_processor.load(dataset, documents, with_keywords=False)
  58. end_at = time.perf_counter()
  59. logging.info(
  60. click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
  61. except Exception:
  62. logging.exception("Deal dataset vector index failed")