remove_app_and_related_data_task.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from sqlalchemy.exc import SQLAlchemyError
  6. from extensions.ext_database import db
  7. from models.dataset import AppDatasetJoin
  8. from models.model import (
  9. ApiToken,
  10. AppAnnotationHitHistory,
  11. AppAnnotationSetting,
  12. AppModelConfig,
  13. Conversation,
  14. EndUser,
  15. InstalledApp,
  16. Message,
  17. MessageAgentThought,
  18. MessageAnnotation,
  19. MessageChain,
  20. MessageFeedback,
  21. MessageFile,
  22. RecommendedApp,
  23. Site,
  24. TagBinding,
  25. TraceAppConfig,
  26. )
  27. from models.tools import WorkflowToolProvider
  28. from models.web import PinnedConversation, SavedMessage
  29. from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun
  30. @shared_task(queue='app_deletion', bind=True, max_retries=3)
  31. def remove_app_and_related_data_task(self, tenant_id: str, app_id: str):
  32. logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green'))
  33. start_at = time.perf_counter()
  34. try:
  35. # Delete related data
  36. _delete_app_model_configs(tenant_id, app_id)
  37. _delete_app_site(tenant_id, app_id)
  38. _delete_app_api_tokens(tenant_id, app_id)
  39. _delete_installed_apps(tenant_id, app_id)
  40. _delete_recommended_apps(tenant_id, app_id)
  41. _delete_app_annotation_data(tenant_id, app_id)
  42. _delete_app_dataset_joins(tenant_id, app_id)
  43. _delete_app_workflows(tenant_id, app_id)
  44. _delete_app_conversations(tenant_id, app_id)
  45. _delete_app_messages(tenant_id, app_id)
  46. _delete_workflow_tool_providers(tenant_id, app_id)
  47. _delete_app_tag_bindings(tenant_id, app_id)
  48. _delete_end_users(tenant_id, app_id)
  49. _delete_trace_app_configs(tenant_id, app_id)
  50. end_at = time.perf_counter()
  51. logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))
  52. except SQLAlchemyError as e:
  53. logging.exception(
  54. click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
  55. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  56. except Exception as e:
  57. logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
  58. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  59. def _delete_app_model_configs(tenant_id: str, app_id: str):
  60. def del_model_config(model_config_id: str):
  61. db.session.query(AppModelConfig).filter(AppModelConfig.id == model_config_id).delete(synchronize_session=False)
  62. _delete_records(
  63. """select id from app_model_configs where app_id=:app_id limit 1000""",
  64. {"app_id": app_id},
  65. del_model_config,
  66. "app model config"
  67. )
  68. def _delete_app_site(tenant_id: str, app_id: str):
  69. def del_site(site_id: str):
  70. db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False)
  71. _delete_records(
  72. """select id from sites where app_id=:app_id limit 1000""",
  73. {"app_id": app_id},
  74. del_site,
  75. "site"
  76. )
  77. def _delete_app_api_tokens(tenant_id: str, app_id: str):
  78. def del_api_token(api_token_id: str):
  79. db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False)
  80. _delete_records(
  81. """select id from api_tokens where app_id=:app_id limit 1000""",
  82. {"app_id": app_id},
  83. del_api_token,
  84. "api token"
  85. )
  86. def _delete_installed_apps(tenant_id: str, app_id: str):
  87. def del_installed_app(installed_app_id: str):
  88. db.session.query(InstalledApp).filter(InstalledApp.id == installed_app_id).delete(synchronize_session=False)
  89. _delete_records(
  90. """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  91. {"tenant_id": tenant_id, "app_id": app_id},
  92. del_installed_app,
  93. "installed app"
  94. )
  95. def _delete_recommended_apps(tenant_id: str, app_id: str):
  96. def del_recommended_app(recommended_app_id: str):
  97. db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete(
  98. synchronize_session=False)
  99. _delete_records(
  100. """select id from recommended_apps where app_id=:app_id limit 1000""",
  101. {"app_id": app_id},
  102. del_recommended_app,
  103. "recommended app"
  104. )
  105. def _delete_app_annotation_data(tenant_id: str, app_id: str):
  106. def del_annotation_hit_history(annotation_hit_history_id: str):
  107. db.session.query(AppAnnotationHitHistory).filter(
  108. AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False)
  109. _delete_records(
  110. """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""",
  111. {"app_id": app_id},
  112. del_annotation_hit_history,
  113. "annotation hit history"
  114. )
  115. def del_annotation_setting(annotation_setting_id: str):
  116. db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete(
  117. synchronize_session=False)
  118. _delete_records(
  119. """select id from app_annotation_settings where app_id=:app_id limit 1000""",
  120. {"app_id": app_id},
  121. del_annotation_setting,
  122. "annotation setting"
  123. )
  124. def _delete_app_dataset_joins(tenant_id: str, app_id: str):
  125. def del_dataset_join(dataset_join_id: str):
  126. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.id == dataset_join_id).delete(synchronize_session=False)
  127. _delete_records(
  128. """select id from app_dataset_joins where app_id=:app_id limit 1000""",
  129. {"app_id": app_id},
  130. del_dataset_join,
  131. "dataset join"
  132. )
  133. def _delete_app_workflows(tenant_id: str, app_id: str):
  134. def del_workflow(workflow_id: str):
  135. db.session.query(WorkflowRun).filter(WorkflowRun.workflow_id == workflow_id).delete(synchronize_session=False)
  136. db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.workflow_id == workflow_id).delete(
  137. synchronize_session=False)
  138. db.session.query(WorkflowAppLog).filter(WorkflowAppLog.workflow_id == workflow_id).delete(
  139. synchronize_session=False)
  140. db.session.query(Workflow).filter(Workflow.id == workflow_id).delete(synchronize_session=False)
  141. _delete_records(
  142. """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  143. {"tenant_id": tenant_id, "app_id": app_id},
  144. del_workflow,
  145. "workflow"
  146. )
  147. def _delete_app_conversations(tenant_id: str, app_id: str):
  148. def del_conversation(conversation_id: str):
  149. db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete(
  150. synchronize_session=False)
  151. db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False)
  152. _delete_records(
  153. """select id from conversations where app_id=:app_id limit 1000""",
  154. {"app_id": app_id},
  155. del_conversation,
  156. "conversation"
  157. )
  158. def _delete_app_messages(tenant_id: str, app_id: str):
  159. def del_message(message_id: str):
  160. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete(
  161. synchronize_session=False)
  162. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete(
  163. synchronize_session=False)
  164. db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(
  165. synchronize_session=False)
  166. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete(
  167. synchronize_session=False)
  168. db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False)
  169. db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(
  170. synchronize_session=False)
  171. db.session.query(Message).filter(Message.id == message_id).delete()
  172. _delete_records(
  173. """select id from messages where app_id=:app_id limit 1000""",
  174. {"app_id": app_id},
  175. del_message,
  176. "message"
  177. )
  178. def _delete_workflow_tool_providers(tenant_id: str, app_id: str):
  179. def del_tool_provider(tool_provider_id: str):
  180. db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete(
  181. synchronize_session=False)
  182. _delete_records(
  183. """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  184. {"tenant_id": tenant_id, "app_id": app_id},
  185. del_tool_provider,
  186. "tool workflow provider"
  187. )
  188. def _delete_app_tag_bindings(tenant_id: str, app_id: str):
  189. def del_tag_binding(tag_binding_id: str):
  190. db.session.query(TagBinding).filter(TagBinding.id == tag_binding_id).delete(synchronize_session=False)
  191. _delete_records(
  192. """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""",
  193. {"tenant_id": tenant_id, "app_id": app_id},
  194. del_tag_binding,
  195. "tag binding"
  196. )
  197. def _delete_end_users(tenant_id: str, app_id: str):
  198. def del_end_user(end_user_id: str):
  199. db.session.query(EndUser).filter(EndUser.id == end_user_id).delete(synchronize_session=False)
  200. _delete_records(
  201. """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  202. {"tenant_id": tenant_id, "app_id": app_id},
  203. del_end_user,
  204. "end user"
  205. )
  206. def _delete_trace_app_configs(tenant_id: str, app_id: str):
  207. def del_trace_app_config(trace_app_config_id: str):
  208. db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete(
  209. synchronize_session=False)
  210. _delete_records(
  211. """select id from trace_app_config where app_id=:app_id limit 1000""",
  212. {"app_id": app_id},
  213. del_trace_app_config,
  214. "trace app config"
  215. )
  216. def _delete_records(query_sql: str, params: dict, delete_func: callable, name: str) -> None:
  217. while True:
  218. with db.engine.begin() as conn:
  219. rs = conn.execute(db.text(query_sql), params)
  220. if rs.rowcount == 0:
  221. break
  222. for i in rs:
  223. record_id = str(i.id)
  224. try:
  225. delete_func(record_id)
  226. db.session.commit()
  227. logging.info(click.style(f"Deleted {name} {record_id}", fg='green'))
  228. except Exception:
  229. logging.exception(f"Error occurred while deleting {name} {record_id}")
  230. continue
  231. rs.close()