remove_segment_from_index_task.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from werkzeug.exceptions import NotFound
  6. from core.index.keyword_table_index import KeywordTableIndex
  7. from core.index.vector_index import VectorIndex
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import DocumentSegment
  11. @shared_task
  12. def remove_segment_from_index_task(segment_id: str):
  13. """
  14. Async Remove segment from index
  15. :param segment_id:
  16. Usage: remove_segment_from_index.delay(segment_id)
  17. """
  18. logging.info(click.style('Start remove segment from index: {}'.format(segment_id), fg='green'))
  19. start_at = time.perf_counter()
  20. segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first()
  21. if not segment:
  22. raise NotFound('Segment not found')
  23. if segment.status != 'completed':
  24. return
  25. indexing_cache_key = 'segment_{}_indexing'.format(segment.id)
  26. try:
  27. dataset = segment.dataset
  28. if not dataset:
  29. raise Exception('Segment has no dataset')
  30. vector_index = VectorIndex(dataset=dataset)
  31. keyword_table_index = KeywordTableIndex(dataset=dataset)
  32. # delete from vector index
  33. if dataset.indexing_technique == "high_quality":
  34. vector_index.del_nodes([segment.index_node_id])
  35. # delete from keyword index
  36. keyword_table_index.del_nodes([segment.index_node_id])
  37. end_at = time.perf_counter()
  38. logging.info(click.style('Segment removed from index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green'))
  39. except Exception:
  40. logging.exception("remove segment from index failed")
  41. segment.enabled = True
  42. db.session.commit()
  43. finally:
  44. redis_client.delete(indexing_cache_key)