| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 | import datetimeimport timeimport clickfrom sqlalchemy import funcfrom werkzeug.exceptions import NotFoundimport appfrom configs import dify_configfrom core.rag.index_processor.index_processor_factory import IndexProcessorFactoryfrom extensions.ext_database import dbfrom models.dataset import Dataset, DatasetQuery, Document@app.celery.task(queue="dataset")def clean_unused_datasets_task():    click.echo(click.style("Start clean unused datasets indexes.", fg="green"))    clean_days = dify_config.CLEAN_DAY_SETTING    start_at = time.perf_counter()    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)    page = 1    while True:        try:                        document_subquery_new = (                db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))                .filter(                    Document.indexing_status == "completed",                    Document.enabled == True,                    Document.archived == False,                    Document.updated_at > thirty_days_ago,                )                .group_by(Document.dataset_id)                .subquery()            )                        document_subquery_old = (                db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))                .filter(                    Document.indexing_status == "completed",                    Document.enabled == True,                    Document.archived == False,                    Document.updated_at < thirty_days_ago,                )                .group_by(Document.dataset_id)                .subquery()            )                        datasets = (                db.session.query(Dataset)                .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)                .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)                .filter(                    Dataset.created_at < thirty_days_ago,                    func.coalesce(document_subquery_new.c.document_count, 0) == 0,                    func.coalesce(document_subquery_old.c.document_count, 0) > 0,                )                .order_by(Dataset.created_at.desc())                .paginate(page=page, per_page=50)            )        except NotFound:            break        if datasets.items is None or len(datasets.items) == 0:            break        page += 1        for dataset in datasets:            dataset_query = (                db.session.query(DatasetQuery)                .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id)                .all()            )            if not dataset_query or len(dataset_query) == 0:                try:                                        index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()                    index_processor.clean(dataset, None)                                        update_params = {Document.enabled: False}                    Document.query.filter_by(dataset_id=dataset.id).update(update_params)                    db.session.commit()                    click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green"))                except Exception as e:                    click.echo(                        click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")                    )    end_at = time.perf_counter()    click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))
 |