add_document_to_index_task.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from llama_index.data_structs import Node
  7. from llama_index.data_structs.node_v2 import DocumentRelationship
  8. from werkzeug.exceptions import NotFound
  9. from core.index.keyword_table_index import KeywordTableIndex
  10. from core.index.vector_index import VectorIndex
  11. from extensions.ext_database import db
  12. from extensions.ext_redis import redis_client
  13. from models.dataset import DocumentSegment, Document
  14. @shared_task
  15. def add_document_to_index_task(document_id: str):
  16. """
  17. Async Add document to index
  18. :param document_id:
  19. Usage: add_document_to_index.delay(document_id)
  20. """
  21. logging.info(click.style('Start add document to index: {}'.format(document_id), fg='green'))
  22. start_at = time.perf_counter()
  23. document = db.session.query(Document).filter(Document.id == document_id).first()
  24. if not document:
  25. raise NotFound('Document not found')
  26. if document.indexing_status != 'completed':
  27. return
  28. indexing_cache_key = 'document_{}_indexing'.format(document.id)
  29. try:
  30. segments = db.session.query(DocumentSegment).filter(
  31. DocumentSegment.document_id == document.id,
  32. DocumentSegment.enabled == True
  33. ) \
  34. .order_by(DocumentSegment.position.asc()).all()
  35. nodes = []
  36. previous_node = None
  37. for segment in segments:
  38. relationships = {
  39. DocumentRelationship.SOURCE: document.id
  40. }
  41. if previous_node:
  42. relationships[DocumentRelationship.PREVIOUS] = previous_node.doc_id
  43. previous_node.relationships[DocumentRelationship.NEXT] = segment.index_node_id
  44. node = Node(
  45. doc_id=segment.index_node_id,
  46. doc_hash=segment.index_node_hash,
  47. text=segment.content,
  48. extra_info=None,
  49. node_info=None,
  50. relationships=relationships
  51. )
  52. previous_node = node
  53. nodes.append(node)
  54. dataset = document.dataset
  55. if not dataset:
  56. raise Exception('Document has no dataset')
  57. vector_index = VectorIndex(dataset=dataset)
  58. keyword_table_index = KeywordTableIndex(dataset=dataset)
  59. # save vector index
  60. if dataset.indexing_technique == "high_quality":
  61. vector_index.add_nodes(
  62. nodes=nodes,
  63. duplicate_check=True
  64. )
  65. # save keyword index
  66. keyword_table_index.add_nodes(nodes)
  67. end_at = time.perf_counter()
  68. logging.info(
  69. click.style('Document added to index: {} latency: {}'.format(document.id, end_at - start_at), fg='green'))
  70. except Exception as e:
  71. logging.exception("add document to index failed")
  72. document.enabled = False
  73. document.disabled_at = datetime.datetime.utcnow()
  74. document.status = 'error'
  75. document.error = str(e)
  76. db.session.commit()
  77. finally:
  78. redis_client.delete(indexing_cache_key)