add_document_to_index_task.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from langchain.schema import Document
  7. from werkzeug.exceptions import NotFound
  8. from core.index.index import IndexBuilder
  9. from extensions.ext_database import db
  10. from extensions.ext_redis import redis_client
  11. from models.dataset import DocumentSegment
  12. from models.dataset import Document as DatasetDocument
  13. @shared_task(queue='dataset')
  14. def add_document_to_index_task(dataset_document_id: str):
  15. """
  16. Async Add document to index
  17. :param document_id:
  18. Usage: add_document_to_index.delay(document_id)
  19. """
  20. logging.info(click.style('Start add document to index: {}'.format(dataset_document_id), fg='green'))
  21. start_at = time.perf_counter()
  22. dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document_id).first()
  23. if not dataset_document:
  24. raise NotFound('Document not found')
  25. if dataset_document.indexing_status != 'completed':
  26. return
  27. indexing_cache_key = 'document_{}_indexing'.format(dataset_document.id)
  28. try:
  29. segments = db.session.query(DocumentSegment).filter(
  30. DocumentSegment.document_id == dataset_document.id,
  31. DocumentSegment.enabled == True
  32. ) \
  33. .order_by(DocumentSegment.position.asc()).all()
  34. documents = []
  35. for segment in segments:
  36. document = Document(
  37. page_content=segment.content,
  38. metadata={
  39. "doc_id": segment.index_node_id,
  40. "doc_hash": segment.index_node_hash,
  41. "document_id": segment.document_id,
  42. "dataset_id": segment.dataset_id,
  43. }
  44. )
  45. documents.append(document)
  46. dataset = dataset_document.dataset
  47. if not dataset:
  48. raise Exception('Document has no dataset')
  49. # save vector index
  50. index = IndexBuilder.get_index(dataset, 'high_quality')
  51. if index:
  52. index.add_texts(documents)
  53. # save keyword index
  54. index = IndexBuilder.get_index(dataset, 'economy')
  55. if index:
  56. index.add_texts(documents)
  57. end_at = time.perf_counter()
  58. logging.info(
  59. click.style('Document added to index: {} latency: {}'.format(dataset_document.id, end_at - start_at), fg='green'))
  60. except Exception as e:
  61. logging.exception("add document to index failed")
  62. dataset_document.enabled = False
  63. dataset_document.disabled_at = datetime.datetime.utcnow()
  64. dataset_document.status = 'error'
  65. dataset_document.error = str(e)
  66. db.session.commit()
  67. finally:
  68. redis_client.delete(indexing_cache_key)