add_document_to_index_task.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from werkzeug.exceptions import NotFound
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from core.rag.models.document import Document
  9. from extensions.ext_database import db
  10. from extensions.ext_redis import redis_client
  11. from models.dataset import Document as DatasetDocument
  12. from models.dataset import DocumentSegment
  13. @shared_task(queue="dataset")
  14. def add_document_to_index_task(dataset_document_id: str):
  15. """
  16. Async Add document to index
  17. :param document_id:
  18. Usage: add_document_to_index.delay(document_id)
  19. """
  20. logging.info(click.style("Start add document to index: {}".format(dataset_document_id), fg="green"))
  21. start_at = time.perf_counter()
  22. dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document_id).first()
  23. if not dataset_document:
  24. raise NotFound("Document not found")
  25. if dataset_document.indexing_status != "completed":
  26. return
  27. indexing_cache_key = "document_{}_indexing".format(dataset_document.id)
  28. try:
  29. segments = (
  30. db.session.query(DocumentSegment)
  31. .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True)
  32. .order_by(DocumentSegment.position.asc())
  33. .all()
  34. )
  35. documents = []
  36. for segment in segments:
  37. document = Document(
  38. page_content=segment.content,
  39. metadata={
  40. "doc_id": segment.index_node_id,
  41. "doc_hash": segment.index_node_hash,
  42. "document_id": segment.document_id,
  43. "dataset_id": segment.dataset_id,
  44. },
  45. )
  46. documents.append(document)
  47. dataset = dataset_document.dataset
  48. if not dataset:
  49. raise Exception("Document has no dataset")
  50. index_type = dataset.doc_form
  51. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  52. index_processor.load(dataset, documents)
  53. end_at = time.perf_counter()
  54. logging.info(
  55. click.style(
  56. "Document added to index: {} latency: {}".format(dataset_document.id, end_at - start_at), fg="green"
  57. )
  58. )
  59. except Exception as e:
  60. logging.exception("add document to index failed")
  61. dataset_document.enabled = False
  62. dataset_document.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  63. dataset_document.status = "error"
  64. dataset_document.error = str(e)
  65. db.session.commit()
  66. finally:
  67. redis_client.delete(indexing_cache_key)