Prechádzať zdrojové kódy

feat: correctly delete applications using Celery workers (#5787)

Charles Zhou 9 mesiacov pred
rodič
commit
cb09dbef66

+ 1 - 1
.devcontainer/post_create_command.sh

@@ -3,7 +3,7 @@
 cd web && npm install
 cd web && npm install
 
 
 echo 'alias start-api="cd /workspaces/dify/api && flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
 echo 'alias start-api="cd /workspaces/dify/api && flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
-echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace"' >> ~/.bashrc
+echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc
 echo 'alias start-web="cd /workspaces/dify/web && npm run dev"' >> ~/.bashrc
 echo 'alias start-web="cd /workspaces/dify/web && npm run dev"' >> ~/.bashrc
 echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify up -d"' >> ~/.bashrc
 echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify up -d"' >> ~/.bashrc
 
 

+ 1 - 1
.vscode/launch.json

@@ -48,7 +48,7 @@
                 "--loglevel",
                 "--loglevel",
                 "info",
                 "info",
                 "-Q",
                 "-Q",
-                "dataset,generation,mail,ops_trace"
+                "dataset,generation,mail,ops_trace,app_deletion"
             ]
             ]
         },
         },
     ]
     ]

+ 1 - 1
api/README.md

@@ -66,7 +66,7 @@
 10. If you need to debug local async processing, please start the worker service.
 10. If you need to debug local async processing, please start the worker service.
 
 
    ```bash
    ```bash
-   poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace
+   poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion
    ```
    ```
 
 
    The started celery app handles the async tasks, e.g. dataset importing and documents indexing.
    The started celery app handles the async tasks, e.g. dataset importing and documents indexing.

+ 1 - 1
api/docker/entrypoint.sh

@@ -9,7 +9,7 @@ fi
 
 
 if [[ "${MODE}" == "worker" ]]; then
 if [[ "${MODE}" == "worker" ]]; then
   celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \
   celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \
-    -Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace}
+    -Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace,app_deletion}
 elif [[ "${MODE}" == "beat" ]]; then
 elif [[ "${MODE}" == "beat" ]]; then
   celery -A app.celery beat --loglevel INFO
   celery -A app.celery beat --loglevel INFO
 else
 else

+ 0 - 3
api/events/app_event.py

@@ -3,9 +3,6 @@ from blinker import signal
 # sender: app
 # sender: app
 app_was_created = signal('app-was-created')
 app_was_created = signal('app-was-created')
 
 
-# sender: app
-app_was_deleted = signal('app-was-deleted')
-
 # sender: app, kwargs: app_model_config
 # sender: app, kwargs: app_model_config
 app_model_config_was_updated = signal('app-model-config-was-updated')
 app_model_config_was_updated = signal('app-model-config-was-updated')
 
 

+ 0 - 3
api/events/event_handlers/__init__.py

@@ -4,10 +4,7 @@ from .create_document_index import handle
 from .create_installed_app_when_app_created import handle
 from .create_installed_app_when_app_created import handle
 from .create_site_record_when_app_created import handle
 from .create_site_record_when_app_created import handle
 from .deduct_quota_when_messaeg_created import handle
 from .deduct_quota_when_messaeg_created import handle
-from .delete_installed_app_when_app_deleted import handle
-from .delete_site_record_when_app_deleted import handle
 from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
 from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
-from .delete_workflow_as_tool_when_app_deleted import handle
 from .update_app_dataset_join_when_app_model_config_updated import handle
 from .update_app_dataset_join_when_app_model_config_updated import handle
 from .update_app_dataset_join_when_app_published_workflow_updated import handle
 from .update_app_dataset_join_when_app_published_workflow_updated import handle
 from .update_provider_last_used_at_when_messaeg_created import handle
 from .update_provider_last_used_at_when_messaeg_created import handle

+ 0 - 12
api/events/event_handlers/delete_installed_app_when_app_deleted.py

@@ -1,12 +0,0 @@
-from events.app_event import app_was_deleted
-from extensions.ext_database import db
-from models.model import InstalledApp
-
-
-@app_was_deleted.connect
-def handle(sender, **kwargs):
-    app = sender
-    installed_apps = db.session.query(InstalledApp).filter(InstalledApp.app_id == app.id).all()
-    for installed_app in installed_apps:
-        db.session.delete(installed_app)
-    db.session.commit()

+ 0 - 11
api/events/event_handlers/delete_site_record_when_app_deleted.py

@@ -1,11 +0,0 @@
-from events.app_event import app_was_deleted
-from extensions.ext_database import db
-from models.model import Site
-
-
-@app_was_deleted.connect
-def handle(sender, **kwargs):
-    app = sender
-    site = db.session.query(Site).filter(Site.app_id == app.id).first()
-    db.session.delete(site)
-    db.session.commit()

+ 0 - 14
api/events/event_handlers/delete_workflow_as_tool_when_app_deleted.py

@@ -1,14 +0,0 @@
-from events.app_event import app_was_deleted
-from extensions.ext_database import db
-from models.tools import WorkflowToolProvider
-
-
-@app_was_deleted.connect
-def handle(sender, **kwargs):
-    app = sender
-    workflow_tools = db.session.query(WorkflowToolProvider).filter(
-        WorkflowToolProvider.app_id == app.id
-    ).all()
-    for workflow_tool in workflow_tools:
-        db.session.delete(workflow_tool)
-    db.session.commit()

+ 4 - 13
api/services/app_service.py

@@ -16,13 +16,14 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy
 from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
 from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
 from core.tools.tool_manager import ToolManager
 from core.tools.tool_manager import ToolManager
 from core.tools.utils.configuration import ToolParameterConfigurationManager
 from core.tools.utils.configuration import ToolParameterConfigurationManager
-from events.app_event import app_model_config_was_updated, app_was_created, app_was_deleted
+from events.app_event import app_model_config_was_updated, app_was_created
 from extensions.ext_database import db
 from extensions.ext_database import db
 from models.account import Account
 from models.account import Account
 from models.model import App, AppMode, AppModelConfig
 from models.model import App, AppMode, AppModelConfig
 from models.tools import ApiToolProvider
 from models.tools import ApiToolProvider
 from services.tag_service import TagService
 from services.tag_service import TagService
 from services.workflow_service import WorkflowService
 from services.workflow_service import WorkflowService
+from tasks.remove_app_and_related_data_task import remove_app_and_related_data_task
 
 
 
 
 class AppService:
 class AppService:
@@ -393,18 +394,8 @@ class AppService:
         Delete app
         Delete app
         :param app: App instance
         :param app: App instance
         """
         """
-        db.session.delete(app)
-        db.session.commit()
-
-        app_was_deleted.send(app)
-
-        # todo async delete related data by event
-        # app_model_configs, site, api_tokens, installed_apps, recommended_apps BY app
-        # app_annotation_hit_histories, app_annotation_settings, app_dataset_joins BY app
-        # workflows, workflow_runs, workflow_node_executions, workflow_app_logs BY app
-        # conversations, pinned_conversations, messages BY app
-        # message_feedbacks, message_annotations, message_chains BY message
-        # message_agent_thoughts, message_files, saved_messages BY message
+        # Trigger asynchronous deletion of app and related data
+        remove_app_and_related_data_task.delay(app.id)
 
 
     def get_app_meta(self, app_model: App) -> dict:
     def get_app_meta(self, app_model: App) -> dict:
         """
         """

+ 150 - 0
api/tasks/remove_app_and_related_data_task.py

@@ -0,0 +1,150 @@
+import logging
+import time
+
+import click
+from celery import shared_task
+from sqlalchemy import select
+from sqlalchemy.exc import SQLAlchemyError
+
+from extensions.ext_database import db
+from models.dataset import AppDatasetJoin
+from models.model import (
+    ApiToken,
+    App,
+    AppAnnotationHitHistory,
+    AppAnnotationSetting,
+    AppModelConfig,
+    Conversation,
+    EndUser,
+    InstalledApp,
+    Message,
+    MessageAgentThought,
+    MessageAnnotation,
+    MessageChain,
+    MessageFeedback,
+    MessageFile,
+    RecommendedApp,
+    Site,
+    TagBinding,
+)
+from models.tools import WorkflowToolProvider
+from models.web import PinnedConversation, SavedMessage
+from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun
+
+
+@shared_task(queue='app_deletion', bind=True, max_retries=3)
+def remove_app_and_related_data_task(self, app_id: str):
+    logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green'))
+    start_at = time.perf_counter()
+
+    deletion_cache_key = f'app_{app_id}_deletion'
+
+    try:
+        # Use a transaction to ensure all deletions succeed or none do
+        with db.session.begin_nested():
+            app = db.session.query(App).filter(App.id == app_id).first()
+            if not app:
+                logging.warning(click.style(f"App {app_id} not found", fg='yellow'))
+                return
+
+            # Delete related data
+            _delete_app_model_configs(app_id)
+            _delete_app_site(app_id)
+            _delete_app_api_tokens(app_id)
+            _delete_installed_apps(app_id)
+            _delete_recommended_apps(app_id)
+            _delete_app_annotation_data(app_id)
+            _delete_app_dataset_joins(app_id)
+            _delete_app_workflows(app_id)
+            _delete_app_conversations(app_id)
+            _delete_app_messages(app_id)
+            _delete_workflow_tool_providers(app_id)
+            _delete_app_tag_bindings(app_id)
+            _delete_end_users(app_id)
+
+            # Delete the app itself
+            db.session.delete(app)
+
+
+        # If we reach here, the transaction was successful
+        db.session.commit()
+
+        end_at = time.perf_counter()
+        logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))
+
+    except SQLAlchemyError as e:
+        db.session.rollback()
+        logging.exception(click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
+        raise self.retry(exc=e, countdown=60)  # Retry after 60 seconds
+
+    except Exception as e:
+        logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
+        raise self.retry(exc=e, countdown=60)  # Retry after 60 seconds
+
+
+def _delete_app_model_configs(app_id: str):
+    db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete()
+
+def _delete_app_site(app_id: str):
+    db.session.query(Site).filter(Site.app_id == app_id).delete()
+
+def _delete_app_api_tokens(app_id: str):
+    db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete()
+
+def _delete_installed_apps(app_id: str):
+    db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete()
+
+def _delete_recommended_apps(app_id: str):
+    db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete()
+
+def _delete_app_annotation_data(app_id: str):
+    db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete()
+    db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete()
+
+def _delete_app_dataset_joins(app_id: str):
+    db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete()
+
+def _delete_app_workflows(app_id: str):
+    db.session.query(WorkflowRun).filter(
+        WorkflowRun.workflow_id.in_(
+            db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
+        )
+    ).delete(synchronize_session=False)
+    db.session.query(WorkflowNodeExecution).filter(
+        WorkflowNodeExecution.workflow_id.in_(
+            db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
+        )
+    ).delete(synchronize_session=False)
+    db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False)
+    db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False)
+
+def _delete_app_conversations(app_id: str):
+    db.session.query(PinnedConversation).filter(
+        PinnedConversation.conversation_id.in_(
+            db.session.query(Conversation.id).filter(Conversation.app_id == app_id)
+        )
+    ).delete(synchronize_session=False)
+    db.session.query(Conversation).filter(Conversation.app_id == app_id).delete()
+
+def _delete_app_messages(app_id: str):
+    message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery()
+    db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False)
+    db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False)
+
+def _delete_workflow_tool_providers(app_id: str):
+    db.session.query(WorkflowToolProvider).filter(
+        WorkflowToolProvider.app_id == app_id
+        ).delete(synchronize_session=False)
+
+def _delete_app_tag_bindings(app_id: str):
+    db.session.query(TagBinding).filter(
+        TagBinding.target_id == app_id
+    ).delete(synchronize_session=False)
+    
+def _delete_end_users(app_id: str):
+    db.session.query(EndUser).filter(EndUser.app_id == app_id).delete()