remove_app_and_related_data_task.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from sqlalchemy import select
  6. from sqlalchemy.exc import SQLAlchemyError
  7. from extensions.ext_database import db
  8. from models.dataset import AppDatasetJoin
  9. from models.model import (
  10. ApiToken,
  11. App,
  12. AppAnnotationHitHistory,
  13. AppAnnotationSetting,
  14. AppModelConfig,
  15. Conversation,
  16. EndUser,
  17. InstalledApp,
  18. Message,
  19. MessageAgentThought,
  20. MessageAnnotation,
  21. MessageChain,
  22. MessageFeedback,
  23. MessageFile,
  24. RecommendedApp,
  25. Site,
  26. TagBinding,
  27. )
  28. from models.tools import WorkflowToolProvider
  29. from models.web import PinnedConversation, SavedMessage
  30. from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun
  31. @shared_task(queue='app_deletion', bind=True, max_retries=3)
  32. def remove_app_and_related_data_task(self, app_id: str):
  33. logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green'))
  34. start_at = time.perf_counter()
  35. deletion_cache_key = f'app_{app_id}_deletion'
  36. try:
  37. # Use a transaction to ensure all deletions succeed or none do
  38. with db.session.begin_nested():
  39. app = db.session.query(App).filter(App.id == app_id).first()
  40. if not app:
  41. logging.warning(click.style(f"App {app_id} not found", fg='yellow'))
  42. return
  43. # Delete related data
  44. _delete_app_model_configs(app_id)
  45. _delete_app_site(app_id)
  46. _delete_app_api_tokens(app_id)
  47. _delete_installed_apps(app_id)
  48. _delete_recommended_apps(app_id)
  49. _delete_app_annotation_data(app_id)
  50. _delete_app_dataset_joins(app_id)
  51. _delete_app_workflows(app_id)
  52. _delete_app_conversations(app_id)
  53. _delete_app_messages(app_id)
  54. _delete_workflow_tool_providers(app_id)
  55. _delete_app_tag_bindings(app_id)
  56. _delete_end_users(app_id)
  57. # Delete the app itself
  58. db.session.delete(app)
  59. # If we reach here, the transaction was successful
  60. db.session.commit()
  61. end_at = time.perf_counter()
  62. logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))
  63. except SQLAlchemyError as e:
  64. db.session.rollback()
  65. logging.exception(click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
  66. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  67. except Exception as e:
  68. logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
  69. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  70. def _delete_app_model_configs(app_id: str):
  71. db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete()
  72. def _delete_app_site(app_id: str):
  73. db.session.query(Site).filter(Site.app_id == app_id).delete()
  74. def _delete_app_api_tokens(app_id: str):
  75. db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete()
  76. def _delete_installed_apps(app_id: str):
  77. db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete()
  78. def _delete_recommended_apps(app_id: str):
  79. db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete()
  80. def _delete_app_annotation_data(app_id: str):
  81. db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete()
  82. db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete()
  83. def _delete_app_dataset_joins(app_id: str):
  84. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete()
  85. def _delete_app_workflows(app_id: str):
  86. db.session.query(WorkflowRun).filter(
  87. WorkflowRun.workflow_id.in_(
  88. db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
  89. )
  90. ).delete(synchronize_session=False)
  91. db.session.query(WorkflowNodeExecution).filter(
  92. WorkflowNodeExecution.workflow_id.in_(
  93. db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
  94. )
  95. ).delete(synchronize_session=False)
  96. db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False)
  97. db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False)
  98. def _delete_app_conversations(app_id: str):
  99. db.session.query(PinnedConversation).filter(
  100. PinnedConversation.conversation_id.in_(
  101. db.session.query(Conversation.id).filter(Conversation.app_id == app_id)
  102. )
  103. ).delete(synchronize_session=False)
  104. db.session.query(Conversation).filter(Conversation.app_id == app_id).delete()
  105. def _delete_app_messages(app_id: str):
  106. message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery()
  107. db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete(synchronize_session=False)
  108. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete(synchronize_session=False)
  109. db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False)
  110. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete(synchronize_session=False)
  111. db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False)
  112. db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False)
  113. db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False)
  114. def _delete_workflow_tool_providers(app_id: str):
  115. db.session.query(WorkflowToolProvider).filter(
  116. WorkflowToolProvider.app_id == app_id
  117. ).delete(synchronize_session=False)
  118. def _delete_app_tag_bindings(app_id: str):
  119. db.session.query(TagBinding).filter(
  120. TagBinding.target_id == app_id
  121. ).delete(synchronize_session=False)
  122. def _delete_end_users(app_id: str):
  123. db.session.query(EndUser).filter(EndUser.app_id == app_id).delete()