clean_dataset_task.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from extensions.ext_database import db
  7. from extensions.ext_storage import storage
  8. from models.dataset import (
  9. AppDatasetJoin,
  10. Dataset,
  11. DatasetProcessRule,
  12. DatasetQuery,
  13. Document,
  14. DocumentSegment,
  15. )
  16. from models.model import UploadFile
  17. # Add import statement for ValueError
  18. @shared_task(queue="dataset")
  19. def clean_dataset_task(
  20. dataset_id: str,
  21. tenant_id: str,
  22. indexing_technique: str,
  23. index_struct: str,
  24. collection_binding_id: str,
  25. doc_form: str,
  26. ):
  27. """
  28. Clean dataset when dataset deleted.
  29. :param dataset_id: dataset id
  30. :param tenant_id: tenant id
  31. :param indexing_technique: indexing technique
  32. :param index_struct: index struct dict
  33. :param collection_binding_id: collection binding id
  34. :param doc_form: dataset form
  35. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  36. """
  37. logging.info(click.style("Start clean dataset when dataset deleted: {}".format(dataset_id), fg="green"))
  38. start_at = time.perf_counter()
  39. try:
  40. dataset = Dataset(
  41. id=dataset_id,
  42. tenant_id=tenant_id,
  43. indexing_technique=indexing_technique,
  44. index_struct=index_struct,
  45. collection_binding_id=collection_binding_id,
  46. )
  47. documents = db.session.query(Document).filter(Document.dataset_id == dataset_id).all()
  48. segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all()
  49. if documents is None or len(documents) == 0:
  50. logging.info(click.style("No documents found for dataset: {}".format(dataset_id), fg="green"))
  51. else:
  52. logging.info(click.style("Cleaning documents for dataset: {}".format(dataset_id), fg="green"))
  53. # Specify the index type before initializing the index processor
  54. if doc_form is None:
  55. raise ValueError("Index type must be specified.")
  56. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  57. index_processor.clean(dataset, None)
  58. for document in documents:
  59. db.session.delete(document)
  60. for segment in segments:
  61. db.session.delete(segment)
  62. db.session.query(DatasetProcessRule).filter(DatasetProcessRule.dataset_id == dataset_id).delete()
  63. db.session.query(DatasetQuery).filter(DatasetQuery.dataset_id == dataset_id).delete()
  64. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.dataset_id == dataset_id).delete()
  65. # delete files
  66. if documents:
  67. for document in documents:
  68. try:
  69. if document.data_source_type == "upload_file":
  70. if document.data_source_info:
  71. data_source_info = document.data_source_info_dict
  72. if data_source_info and "upload_file_id" in data_source_info:
  73. file_id = data_source_info["upload_file_id"]
  74. file = (
  75. db.session.query(UploadFile)
  76. .filter(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  77. .first()
  78. )
  79. if not file:
  80. continue
  81. storage.delete(file.key)
  82. db.session.delete(file)
  83. except Exception:
  84. continue
  85. db.session.commit()
  86. end_at = time.perf_counter()
  87. logging.info(
  88. click.style(
  89. "Cleaned dataset when dataset deleted: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"
  90. )
  91. )
  92. except Exception:
  93. logging.exception("Cleaned dataset when dataset deleted failed")