deal_dataset_vector_index_task.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from langchain.schema import Document
  6. from core.index.index import IndexBuilder
  7. from extensions.ext_database import db
  8. from models.dataset import DocumentSegment, Dataset
  9. from models.dataset import Document as DatasetDocument
  10. @shared_task(queue='dataset')
  11. def deal_dataset_vector_index_task(dataset_id: str, action: str):
  12. """
  13. Async deal dataset from index
  14. :param dataset_id: dataset_id
  15. :param action: action
  16. Usage: deal_dataset_vector_index_task.delay(dataset_id, action)
  17. """
  18. logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green'))
  19. start_at = time.perf_counter()
  20. try:
  21. dataset = Dataset.query.filter_by(
  22. id=dataset_id
  23. ).first()
  24. if not dataset:
  25. raise Exception('Dataset not found')
  26. if action == "remove":
  27. index = IndexBuilder.get_index(dataset, 'high_quality', ignore_high_quality_check=True)
  28. index.delete()
  29. elif action == "add":
  30. dataset_documents = db.session.query(DatasetDocument).filter(
  31. DatasetDocument.dataset_id == dataset_id,
  32. DatasetDocument.indexing_status == 'completed',
  33. DatasetDocument.enabled == True,
  34. DatasetDocument.archived == False,
  35. ).all()
  36. if dataset_documents:
  37. # save vector index
  38. index = IndexBuilder.get_index(dataset, 'high_quality', ignore_high_quality_check=True)
  39. documents = []
  40. for dataset_document in dataset_documents:
  41. # delete from vector index
  42. segments = db.session.query(DocumentSegment).filter(
  43. DocumentSegment.document_id == dataset_document.id,
  44. DocumentSegment.enabled == True
  45. ) .order_by(DocumentSegment.position.asc()).all()
  46. for segment in segments:
  47. document = Document(
  48. page_content=segment.content,
  49. metadata={
  50. "doc_id": segment.index_node_id,
  51. "doc_hash": segment.index_node_hash,
  52. "document_id": segment.document_id,
  53. "dataset_id": segment.dataset_id,
  54. }
  55. )
  56. documents.append(document)
  57. # save vector index
  58. index.add_texts(documents)
  59. end_at = time.perf_counter()
  60. logging.info(
  61. click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
  62. except Exception:
  63. logging.exception("Deal dataset vector index failed")