remove_app_and_related_data_task.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from sqlalchemy import select
  6. from sqlalchemy.exc import SQLAlchemyError
  7. from extensions.ext_database import db
  8. from models.dataset import AppDatasetJoin
  9. from models.model import (
  10. ApiToken,
  11. AppAnnotationHitHistory,
  12. AppAnnotationSetting,
  13. AppModelConfig,
  14. Conversation,
  15. EndUser,
  16. InstalledApp,
  17. Message,
  18. MessageAgentThought,
  19. MessageAnnotation,
  20. MessageChain,
  21. MessageFeedback,
  22. MessageFile,
  23. RecommendedApp,
  24. Site,
  25. TagBinding,
  26. )
  27. from models.tools import WorkflowToolProvider
  28. from models.web import PinnedConversation, SavedMessage
  29. from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun
  30. @shared_task(queue='app_deletion', bind=True, max_retries=3)
  31. def remove_app_and_related_data_task(self, app_id: str):
  32. logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green'))
  33. start_at = time.perf_counter()
  34. try:
  35. # Use a transaction to ensure all deletions succeed or none do
  36. with db.session.begin_nested():
  37. # Delete related data
  38. _delete_app_model_configs(app_id)
  39. _delete_app_site(app_id)
  40. _delete_app_api_tokens(app_id)
  41. _delete_installed_apps(app_id)
  42. _delete_recommended_apps(app_id)
  43. _delete_app_annotation_data(app_id)
  44. _delete_app_dataset_joins(app_id)
  45. _delete_app_workflows(app_id)
  46. _delete_app_conversations(app_id)
  47. _delete_app_messages(app_id)
  48. _delete_workflow_tool_providers(app_id)
  49. _delete_app_tag_bindings(app_id)
  50. _delete_end_users(app_id)
  51. # If we reach here, the transaction was successful
  52. db.session.commit()
  53. end_at = time.perf_counter()
  54. logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))
  55. except SQLAlchemyError as e:
  56. db.session.rollback()
  57. logging.exception(
  58. click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
  59. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  60. except Exception as e:
  61. logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
  62. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  63. def _delete_app_model_configs(app_id: str):
  64. db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete()
  65. def _delete_app_site(app_id: str):
  66. db.session.query(Site).filter(Site.app_id == app_id).delete()
  67. def _delete_app_api_tokens(app_id: str):
  68. db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete()
  69. def _delete_installed_apps(app_id: str):
  70. db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete()
  71. def _delete_recommended_apps(app_id: str):
  72. db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete()
  73. def _delete_app_annotation_data(app_id: str):
  74. db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete()
  75. db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete()
  76. def _delete_app_dataset_joins(app_id: str):
  77. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete()
  78. def _delete_app_workflows(app_id: str):
  79. db.session.query(WorkflowRun).filter(
  80. WorkflowRun.workflow_id.in_(
  81. db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
  82. )
  83. ).delete(synchronize_session=False)
  84. db.session.query(WorkflowNodeExecution).filter(
  85. WorkflowNodeExecution.workflow_id.in_(
  86. db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
  87. )
  88. ).delete(synchronize_session=False)
  89. db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False)
  90. db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False)
  91. def _delete_app_conversations(app_id: str):
  92. db.session.query(PinnedConversation).filter(
  93. PinnedConversation.conversation_id.in_(
  94. db.session.query(Conversation.id).filter(Conversation.app_id == app_id)
  95. )
  96. ).delete(synchronize_session=False)
  97. db.session.query(Conversation).filter(Conversation.app_id == app_id).delete()
  98. def _delete_app_messages(app_id: str):
  99. message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery()
  100. db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete(
  101. synchronize_session=False)
  102. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete(
  103. synchronize_session=False)
  104. db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False)
  105. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete(
  106. synchronize_session=False)
  107. db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False)
  108. db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False)
  109. db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False)
  110. def _delete_workflow_tool_providers(app_id: str):
  111. db.session.query(WorkflowToolProvider).filter(
  112. WorkflowToolProvider.app_id == app_id
  113. ).delete(synchronize_session=False)
  114. def _delete_app_tag_bindings(app_id: str):
  115. db.session.query(TagBinding).filter(
  116. TagBinding.target_id == app_id
  117. ).delete(synchronize_session=False)
  118. def _delete_end_users(app_id: str):
  119. db.session.query(EndUser).filter(EndUser.app_id == app_id).delete()