clean_unused_datasets_task.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import logging
  2. import app
  3. import datetime
  4. import time
  5. import click
  6. from flask import current_app
  7. from werkzeug.exceptions import NotFound
  8. from core.index.index import IndexBuilder
  9. from extensions.ext_database import db
  10. from models.dataset import Dataset, DatasetQuery, Document, DatasetCollectionBinding
  11. @app.celery.task(queue='dataset')
  12. def clean_unused_datasets_task():
  13. click.echo(click.style('Start clean unused datasets indexes.', fg='green'))
  14. clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
  15. start_at = time.perf_counter()
  16. thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
  17. page = 1
  18. while True:
  19. try:
  20. datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \
  21. .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
  22. except NotFound:
  23. break
  24. page += 1
  25. for dataset in datasets:
  26. dataset_query = db.session.query(DatasetQuery).filter(
  27. DatasetQuery.created_at > thirty_days_ago,
  28. DatasetQuery.dataset_id == dataset.id
  29. ).all()
  30. if not dataset_query or len(dataset_query) == 0:
  31. documents = db.session.query(Document).filter(
  32. Document.dataset_id == dataset.id,
  33. Document.indexing_status == 'completed',
  34. Document.enabled == True,
  35. Document.archived == False,
  36. Document.updated_at > thirty_days_ago
  37. ).all()
  38. if not documents or len(documents) == 0:
  39. try:
  40. # remove index
  41. vector_index = IndexBuilder.get_index(dataset, 'high_quality')
  42. kw_index = IndexBuilder.get_index(dataset, 'economy')
  43. # delete from vector index
  44. if vector_index:
  45. if dataset.collection_binding_id:
  46. vector_index.delete_by_group_id(dataset.id)
  47. else:
  48. if dataset.collection_binding_id:
  49. vector_index.delete_by_group_id(dataset.id)
  50. else:
  51. vector_index.delete()
  52. kw_index.delete()
  53. # update document
  54. update_params = {
  55. Document.enabled: False
  56. }
  57. Document.query.filter_by(dataset_id=dataset.id).update(update_params)
  58. db.session.commit()
  59. click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id),
  60. fg='green'))
  61. except Exception as e:
  62. click.echo(
  63. click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
  64. fg='red'))
  65. end_at = time.perf_counter()
  66. click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))