delete_annotation_index_task.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.datasource.vdb.vector_factory import Vector
  6. from models.dataset import Dataset
  7. from services.dataset_service import DatasetCollectionBindingService
  8. @shared_task(queue="dataset")
  9. def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str, collection_binding_id: str):
  10. """
  11. Async delete annotation index task
  12. """
  13. logging.info(click.style("Start delete app annotation index: {}".format(app_id), fg="green"))
  14. start_at = time.perf_counter()
  15. try:
  16. dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type(
  17. collection_binding_id, "annotation"
  18. )
  19. dataset = Dataset(
  20. id=app_id,
  21. tenant_id=tenant_id,
  22. indexing_technique="high_quality",
  23. collection_binding_id=dataset_collection_binding.id,
  24. )
  25. try:
  26. vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"])
  27. vector.delete_by_metadata_field("annotation_id", annotation_id)
  28. except Exception:
  29. logging.exception("Delete annotation index failed when annotation deleted.")
  30. end_at = time.perf_counter()
  31. logging.info(
  32. click.style("App annotations index deleted : {} latency: {}".format(app_id, end_at - start_at), fg="green")
  33. )
  34. except Exception as e:
  35. logging.exception("Annotation deleted index failed:{}".format(str(e)))