deal_dataset_vector_index_task.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from langchain.schema import Document
  6. from core.index.index import IndexBuilder
  7. from extensions.ext_database import db
  8. from models.dataset import DocumentSegment, Dataset
  9. from models.dataset import Document as DatasetDocument
  10. @shared_task
  11. def deal_dataset_vector_index_task(dataset_id: str, action: str):
  12. """
  13. Async deal dataset from index
  14. :param dataset_id: dataset_id
  15. :param action: action
  16. Usage: deal_dataset_vector_index_task.delay(dataset_id, action)
  17. """
  18. logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green'))
  19. start_at = time.perf_counter()
  20. try:
  21. dataset = Dataset.query.filter_by(
  22. id=dataset_id
  23. ).first()
  24. if not dataset:
  25. raise Exception('Dataset not found')
  26. if action == "remove":
  27. index = IndexBuilder.get_index(dataset, 'high_quality', ignore_high_quality_check=True)
  28. index.delete()
  29. elif action == "add":
  30. dataset_documents = db.session.query(DatasetDocument).filter(
  31. DatasetDocument.dataset_id == dataset_id,
  32. DatasetDocument.indexing_status == 'completed',
  33. DatasetDocument.enabled == True,
  34. DatasetDocument.archived == False,
  35. ).all()
  36. if dataset_documents:
  37. # save vector index
  38. index = IndexBuilder.get_index(dataset, 'high_quality', ignore_high_quality_check=True)
  39. for dataset_document in dataset_documents:
  40. # delete from vector index
  41. segments = db.session.query(DocumentSegment).filter(
  42. DocumentSegment.document_id == dataset_document.id,
  43. DocumentSegment.enabled == True
  44. ) .order_by(DocumentSegment.position.asc()).all()
  45. documents = []
  46. for segment in segments:
  47. document = Document(
  48. page_content=segment.content,
  49. metadata={
  50. "doc_id": segment.index_node_id,
  51. "doc_hash": segment.index_node_hash,
  52. "document_id": segment.document_id,
  53. "dataset_id": segment.dataset_id,
  54. }
  55. )
  56. documents.append(document)
  57. # save vector index
  58. index.add_texts(documents)
  59. end_at = time.perf_counter()
  60. logging.info(
  61. click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
  62. except Exception:
  63. logging.exception("Deal dataset vector index failed")