document_indexing_task.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from werkzeug.exceptions import NotFound
  7. from core.indexing_runner import IndexingRunner, DocumentIsPausedException
  8. from core.llm.error import ProviderTokenNotInitError
  9. from extensions.ext_database import db
  10. from models.dataset import Document
  11. @shared_task
  12. def document_indexing_task(dataset_id: str, document_id: str):
  13. """
  14. Async process document
  15. :param dataset_id:
  16. :param document_id:
  17. Usage: document_indexing_task.delay(dataset_id, document_id)
  18. """
  19. logging.info(click.style('Start process document: {}'.format(document_id), fg='green'))
  20. start_at = time.perf_counter()
  21. document = db.session.query(Document).filter(
  22. Document.id == document_id,
  23. Document.dataset_id == dataset_id
  24. ).first()
  25. if not document:
  26. raise NotFound('Document not found')
  27. document.indexing_status = 'parsing'
  28. document.processing_started_at = datetime.datetime.utcnow()
  29. db.session.commit()
  30. try:
  31. indexing_runner = IndexingRunner()
  32. indexing_runner.run(document)
  33. end_at = time.perf_counter()
  34. logging.info(click.style('Processed document: {} latency: {}'.format(document.id, end_at - start_at), fg='green'))
  35. except DocumentIsPausedException:
  36. logging.info(click.style('Document paused, document id: {}'.format(document.id), fg='yellow'))
  37. except ProviderTokenNotInitError as e:
  38. document.indexing_status = 'error'
  39. document.error = str(e.description)
  40. document.stopped_at = datetime.datetime.utcnow()
  41. db.session.commit()
  42. except Exception as e:
  43. logging.exception("consume document failed")
  44. document.indexing_status = 'error'
  45. document.error = str(e)
  46. document.stopped_at = datetime.datetime.utcnow()
  47. db.session.commit()