clean_dataset_task.py 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from extensions.ext_database import db
  7. from extensions.ext_storage import storage
  8. from models.dataset import (
  9. AppDatasetJoin,
  10. Dataset,
  11. DatasetProcessRule,
  12. DatasetQuery,
  13. Document,
  14. DocumentSegment,
  15. )
  16. from models.model import UploadFile
  17. # Add import statement for ValueError
  18. @shared_task(queue='dataset')
  19. def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str,
  20. index_struct: str, collection_binding_id: str, doc_form: str):
  21. """
  22. Clean dataset when dataset deleted.
  23. :param dataset_id: dataset id
  24. :param tenant_id: tenant id
  25. :param indexing_technique: indexing technique
  26. :param index_struct: index struct dict
  27. :param collection_binding_id: collection binding id
  28. :param doc_form: dataset form
  29. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  30. """
  31. logging.info(click.style('Start clean dataset when dataset deleted: {}'.format(dataset_id), fg='green'))
  32. start_at = time.perf_counter()
  33. try:
  34. dataset = Dataset(
  35. id=dataset_id,
  36. tenant_id=tenant_id,
  37. indexing_technique=indexing_technique,
  38. index_struct=index_struct,
  39. collection_binding_id=collection_binding_id,
  40. )
  41. documents = db.session.query(Document).filter(Document.dataset_id == dataset_id).all()
  42. segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all()
  43. if documents is None or len(documents) == 0:
  44. logging.info(click.style('No documents found for dataset: {}'.format(dataset_id), fg='green'))
  45. else:
  46. logging.info(click.style('Cleaning documents for dataset: {}'.format(dataset_id), fg='green'))
  47. # Specify the index type before initializing the index processor
  48. if doc_form is None:
  49. raise ValueError("Index type must be specified.")
  50. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  51. index_processor.clean(dataset, None)
  52. for document in documents:
  53. db.session.delete(document)
  54. for segment in segments:
  55. db.session.delete(segment)
  56. db.session.query(DatasetProcessRule).filter(DatasetProcessRule.dataset_id == dataset_id).delete()
  57. db.session.query(DatasetQuery).filter(DatasetQuery.dataset_id == dataset_id).delete()
  58. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.dataset_id == dataset_id).delete()
  59. # delete files
  60. if documents:
  61. for document in documents:
  62. try:
  63. if document.data_source_type == 'upload_file':
  64. if document.data_source_info:
  65. data_source_info = document.data_source_info_dict
  66. if data_source_info and 'upload_file_id' in data_source_info:
  67. file_id = data_source_info['upload_file_id']
  68. file = db.session.query(UploadFile).filter(
  69. UploadFile.tenant_id == document.tenant_id,
  70. UploadFile.id == file_id
  71. ).first()
  72. if not file:
  73. continue
  74. storage.delete(file.key)
  75. db.session.delete(file)
  76. except Exception:
  77. continue
  78. db.session.commit()
  79. end_at = time.perf_counter()
  80. logging.info(
  81. click.style('Cleaned dataset when dataset deleted: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
  82. except Exception:
  83. logging.exception("Cleaned dataset when dataset deleted failed")