remove_app_and_related_data_task.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import logging
  2. import time
  3. from collections.abc import Callable
  4. import click
  5. from celery import shared_task
  6. from sqlalchemy import delete
  7. from sqlalchemy.exc import SQLAlchemyError
  8. from extensions.ext_database import db
  9. from models.dataset import AppDatasetJoin
  10. from models.model import (
  11. ApiToken,
  12. AppAnnotationHitHistory,
  13. AppAnnotationSetting,
  14. AppModelConfig,
  15. Conversation,
  16. EndUser,
  17. InstalledApp,
  18. Message,
  19. MessageAgentThought,
  20. MessageAnnotation,
  21. MessageChain,
  22. MessageFeedback,
  23. MessageFile,
  24. RecommendedApp,
  25. Site,
  26. TagBinding,
  27. TraceAppConfig,
  28. )
  29. from models.tools import WorkflowToolProvider
  30. from models.web import PinnedConversation, SavedMessage
  31. from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun
  32. @shared_task(queue='app_deletion', bind=True, max_retries=3)
  33. def remove_app_and_related_data_task(self, tenant_id: str, app_id: str):
  34. logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green'))
  35. start_at = time.perf_counter()
  36. try:
  37. # Delete related data
  38. _delete_app_model_configs(tenant_id, app_id)
  39. _delete_app_site(tenant_id, app_id)
  40. _delete_app_api_tokens(tenant_id, app_id)
  41. _delete_installed_apps(tenant_id, app_id)
  42. _delete_recommended_apps(tenant_id, app_id)
  43. _delete_app_annotation_data(tenant_id, app_id)
  44. _delete_app_dataset_joins(tenant_id, app_id)
  45. _delete_app_workflows(tenant_id, app_id)
  46. _delete_app_workflow_runs(tenant_id, app_id)
  47. _delete_app_workflow_node_executions(tenant_id, app_id)
  48. _delete_app_workflow_app_logs(tenant_id, app_id)
  49. _delete_app_conversations(tenant_id, app_id)
  50. _delete_app_messages(tenant_id, app_id)
  51. _delete_workflow_tool_providers(tenant_id, app_id)
  52. _delete_app_tag_bindings(tenant_id, app_id)
  53. _delete_end_users(tenant_id, app_id)
  54. _delete_trace_app_configs(tenant_id, app_id)
  55. _delete_conversation_variables(app_id=app_id)
  56. end_at = time.perf_counter()
  57. logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))
  58. except SQLAlchemyError as e:
  59. logging.exception(
  60. click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
  61. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  62. except Exception as e:
  63. logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
  64. raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
  65. def _delete_app_model_configs(tenant_id: str, app_id: str):
  66. def del_model_config(model_config_id: str):
  67. db.session.query(AppModelConfig).filter(AppModelConfig.id == model_config_id).delete(synchronize_session=False)
  68. _delete_records(
  69. """select id from app_model_configs where app_id=:app_id limit 1000""",
  70. {"app_id": app_id},
  71. del_model_config,
  72. "app model config"
  73. )
  74. def _delete_app_site(tenant_id: str, app_id: str):
  75. def del_site(site_id: str):
  76. db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False)
  77. _delete_records(
  78. """select id from sites where app_id=:app_id limit 1000""",
  79. {"app_id": app_id},
  80. del_site,
  81. "site"
  82. )
  83. def _delete_app_api_tokens(tenant_id: str, app_id: str):
  84. def del_api_token(api_token_id: str):
  85. db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False)
  86. _delete_records(
  87. """select id from api_tokens where app_id=:app_id limit 1000""",
  88. {"app_id": app_id},
  89. del_api_token,
  90. "api token"
  91. )
  92. def _delete_installed_apps(tenant_id: str, app_id: str):
  93. def del_installed_app(installed_app_id: str):
  94. db.session.query(InstalledApp).filter(InstalledApp.id == installed_app_id).delete(synchronize_session=False)
  95. _delete_records(
  96. """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  97. {"tenant_id": tenant_id, "app_id": app_id},
  98. del_installed_app,
  99. "installed app"
  100. )
  101. def _delete_recommended_apps(tenant_id: str, app_id: str):
  102. def del_recommended_app(recommended_app_id: str):
  103. db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete(
  104. synchronize_session=False)
  105. _delete_records(
  106. """select id from recommended_apps where app_id=:app_id limit 1000""",
  107. {"app_id": app_id},
  108. del_recommended_app,
  109. "recommended app"
  110. )
  111. def _delete_app_annotation_data(tenant_id: str, app_id: str):
  112. def del_annotation_hit_history(annotation_hit_history_id: str):
  113. db.session.query(AppAnnotationHitHistory).filter(
  114. AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False)
  115. _delete_records(
  116. """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""",
  117. {"app_id": app_id},
  118. del_annotation_hit_history,
  119. "annotation hit history"
  120. )
  121. def del_annotation_setting(annotation_setting_id: str):
  122. db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete(
  123. synchronize_session=False)
  124. _delete_records(
  125. """select id from app_annotation_settings where app_id=:app_id limit 1000""",
  126. {"app_id": app_id},
  127. del_annotation_setting,
  128. "annotation setting"
  129. )
  130. def _delete_app_dataset_joins(tenant_id: str, app_id: str):
  131. def del_dataset_join(dataset_join_id: str):
  132. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.id == dataset_join_id).delete(synchronize_session=False)
  133. _delete_records(
  134. """select id from app_dataset_joins where app_id=:app_id limit 1000""",
  135. {"app_id": app_id},
  136. del_dataset_join,
  137. "dataset join"
  138. )
  139. def _delete_app_workflows(tenant_id: str, app_id: str):
  140. def del_workflow(workflow_id: str):
  141. db.session.query(Workflow).filter(Workflow.id == workflow_id).delete(synchronize_session=False)
  142. _delete_records(
  143. """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  144. {"tenant_id": tenant_id, "app_id": app_id},
  145. del_workflow,
  146. "workflow"
  147. )
  148. def _delete_app_workflow_runs(tenant_id: str, app_id: str):
  149. def del_workflow_run(workflow_run_id: str):
  150. db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).delete(synchronize_session=False)
  151. _delete_records(
  152. """select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  153. {"tenant_id": tenant_id, "app_id": app_id},
  154. del_workflow_run,
  155. "workflow run"
  156. )
  157. def _delete_app_workflow_node_executions(tenant_id: str, app_id: str):
  158. def del_workflow_node_execution(workflow_node_execution_id: str):
  159. db.session.query(WorkflowNodeExecution).filter(
  160. WorkflowNodeExecution.id == workflow_node_execution_id).delete(synchronize_session=False)
  161. _delete_records(
  162. """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  163. {"tenant_id": tenant_id, "app_id": app_id},
  164. del_workflow_node_execution,
  165. "workflow node execution"
  166. )
  167. def _delete_app_workflow_app_logs(tenant_id: str, app_id: str):
  168. def del_workflow_app_log(workflow_app_log_id: str):
  169. db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete(synchronize_session=False)
  170. _delete_records(
  171. """select id from workflow_app_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  172. {"tenant_id": tenant_id, "app_id": app_id},
  173. del_workflow_app_log,
  174. "workflow app log"
  175. )
  176. def _delete_app_conversations(tenant_id: str, app_id: str):
  177. def del_conversation(conversation_id: str):
  178. db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete(
  179. synchronize_session=False)
  180. db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False)
  181. _delete_records(
  182. """select id from conversations where app_id=:app_id limit 1000""",
  183. {"app_id": app_id},
  184. del_conversation,
  185. "conversation"
  186. )
  187. def _delete_conversation_variables(*, app_id: str):
  188. stmt = delete(ConversationVariable).where(ConversationVariable.app_id == app_id)
  189. with db.engine.connect() as conn:
  190. conn.execute(stmt)
  191. conn.commit()
  192. logging.info(click.style(f"Deleted conversation variables for app {app_id}", fg='green'))
  193. def _delete_app_messages(tenant_id: str, app_id: str):
  194. def del_message(message_id: str):
  195. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete(
  196. synchronize_session=False)
  197. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete(
  198. synchronize_session=False)
  199. db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(
  200. synchronize_session=False)
  201. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete(
  202. synchronize_session=False)
  203. db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False)
  204. db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(
  205. synchronize_session=False)
  206. db.session.query(Message).filter(Message.id == message_id).delete()
  207. _delete_records(
  208. """select id from messages where app_id=:app_id limit 1000""",
  209. {"app_id": app_id},
  210. del_message,
  211. "message"
  212. )
  213. def _delete_workflow_tool_providers(tenant_id: str, app_id: str):
  214. def del_tool_provider(tool_provider_id: str):
  215. db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete(
  216. synchronize_session=False)
  217. _delete_records(
  218. """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  219. {"tenant_id": tenant_id, "app_id": app_id},
  220. del_tool_provider,
  221. "tool workflow provider"
  222. )
  223. def _delete_app_tag_bindings(tenant_id: str, app_id: str):
  224. def del_tag_binding(tag_binding_id: str):
  225. db.session.query(TagBinding).filter(TagBinding.id == tag_binding_id).delete(synchronize_session=False)
  226. _delete_records(
  227. """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""",
  228. {"tenant_id": tenant_id, "app_id": app_id},
  229. del_tag_binding,
  230. "tag binding"
  231. )
  232. def _delete_end_users(tenant_id: str, app_id: str):
  233. def del_end_user(end_user_id: str):
  234. db.session.query(EndUser).filter(EndUser.id == end_user_id).delete(synchronize_session=False)
  235. _delete_records(
  236. """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
  237. {"tenant_id": tenant_id, "app_id": app_id},
  238. del_end_user,
  239. "end user"
  240. )
  241. def _delete_trace_app_configs(tenant_id: str, app_id: str):
  242. def del_trace_app_config(trace_app_config_id: str):
  243. db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete(
  244. synchronize_session=False)
  245. _delete_records(
  246. """select id from trace_app_config where app_id=:app_id limit 1000""",
  247. {"app_id": app_id},
  248. del_trace_app_config,
  249. "trace app config"
  250. )
  251. def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: str) -> None:
  252. while True:
  253. with db.engine.begin() as conn:
  254. rs = conn.execute(db.text(query_sql), params)
  255. if rs.rowcount == 0:
  256. break
  257. for i in rs:
  258. record_id = str(i.id)
  259. try:
  260. delete_func(record_id)
  261. db.session.commit()
  262. logging.info(click.style(f"Deleted {name} {record_id}", fg='green'))
  263. except Exception:
  264. logging.exception(f"Error occurred while deleting {name} {record_id}")
  265. continue
  266. rs.close()