update_segment_keyword_index_task.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from werkzeug.exceptions import NotFound
  7. from core.index.index import IndexBuilder
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.dataset import DocumentSegment
  11. @shared_task(queue='dataset')
  12. def update_segment_keyword_index_task(segment_id: str):
  13. """
  14. Async update segment index
  15. :param segment_id:
  16. Usage: update_segment_keyword_index_task.delay(segment_id)
  17. """
  18. logging.info(click.style('Start update segment keyword index: {}'.format(segment_id), fg='green'))
  19. start_at = time.perf_counter()
  20. segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first()
  21. if not segment:
  22. raise NotFound('Segment not found')
  23. indexing_cache_key = 'segment_{}_indexing'.format(segment.id)
  24. try:
  25. dataset = segment.dataset
  26. if not dataset:
  27. logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan'))
  28. return
  29. dataset_document = segment.document
  30. if not dataset_document:
  31. logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan'))
  32. return
  33. if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed':
  34. logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan'))
  35. return
  36. kw_index = IndexBuilder.get_index(dataset, 'economy')
  37. # delete from keyword index
  38. kw_index.delete_by_ids([segment.index_node_id])
  39. # save keyword index
  40. index = IndexBuilder.get_index(dataset, 'economy')
  41. if index:
  42. index.update_segment_keywords_index(segment.index_node_id, segment.keywords)
  43. end_at = time.perf_counter()
  44. logging.info(click.style('Segment update index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green'))
  45. except Exception as e:
  46. logging.exception("update segment index failed")
  47. segment.enabled = False
  48. segment.disabled_at = datetime.datetime.utcnow()
  49. segment.status = 'error'
  50. segment.error = str(e)
  51. db.session.commit()
  52. finally:
  53. redis_client.delete(indexing_cache_key)