create_segment_to_index_task.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import datetime
  2. import logging
  3. import time
  4. from typing import Optional, List
  5. import click
  6. from celery import shared_task
  7. from langchain.schema import Document
  8. from werkzeug.exceptions import NotFound
  9. from core.index.index import IndexBuilder
  10. from extensions.ext_database import db
  11. from extensions.ext_redis import redis_client
  12. from models.dataset import DocumentSegment
  13. @shared_task(queue='dataset')
  14. def create_segment_to_index_task(segment_id: str, keywords: Optional[List[str]] = None):
  15. """
  16. Async create segment to index
  17. :param segment_id:
  18. :param keywords:
  19. Usage: create_segment_to_index_task.delay(segment_id)
  20. """
  21. logging.info(click.style('Start create segment to index: {}'.format(segment_id), fg='green'))
  22. start_at = time.perf_counter()
  23. segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first()
  24. if not segment:
  25. raise NotFound('Segment not found')
  26. if segment.status != 'waiting':
  27. return
  28. indexing_cache_key = 'segment_{}_indexing'.format(segment.id)
  29. try:
  30. # update segment status to indexing
  31. update_params = {
  32. DocumentSegment.status: "indexing",
  33. DocumentSegment.indexing_at: datetime.datetime.utcnow()
  34. }
  35. DocumentSegment.query.filter_by(id=segment.id).update(update_params)
  36. db.session.commit()
  37. document = Document(
  38. page_content=segment.content,
  39. metadata={
  40. "doc_id": segment.index_node_id,
  41. "doc_hash": segment.index_node_hash,
  42. "document_id": segment.document_id,
  43. "dataset_id": segment.dataset_id,
  44. }
  45. )
  46. dataset = segment.dataset
  47. if not dataset:
  48. logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan'))
  49. return
  50. dataset_document = segment.document
  51. if not dataset_document:
  52. logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan'))
  53. return
  54. if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed':
  55. logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan'))
  56. return
  57. # save vector index
  58. index = IndexBuilder.get_index(dataset, 'high_quality')
  59. if index:
  60. index.add_texts([document], duplicate_check=True)
  61. # save keyword index
  62. index = IndexBuilder.get_index(dataset, 'economy')
  63. if index:
  64. if keywords and len(keywords) > 0:
  65. index.create_segment_keywords(segment.index_node_id, keywords)
  66. else:
  67. index.add_texts([document])
  68. # update segment to completed
  69. update_params = {
  70. DocumentSegment.status: "completed",
  71. DocumentSegment.completed_at: datetime.datetime.utcnow()
  72. }
  73. DocumentSegment.query.filter_by(id=segment.id).update(update_params)
  74. db.session.commit()
  75. end_at = time.perf_counter()
  76. logging.info(click.style('Segment created to index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green'))
  77. except Exception as e:
  78. logging.exception("create segment to index failed")
  79. segment.enabled = False
  80. segment.disabled_at = datetime.datetime.utcnow()
  81. segment.status = 'error'
  82. segment.error = str(e)
  83. db.session.commit()
  84. finally:
  85. redis_client.delete(indexing_cache_key)