workflow_run_service.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. from extensions.ext_database import db
  2. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  3. from models.model import App
  4. from models.workflow import (
  5. WorkflowNodeExecution,
  6. WorkflowNodeExecutionTriggeredFrom,
  7. WorkflowRun,
  8. WorkflowRunTriggeredFrom,
  9. )
  10. class WorkflowRunService:
  11. def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  12. """
  13. Get advanced chat app workflow run list
  14. Only return triggered_from == advanced_chat
  15. :param app_model: app model
  16. :param args: request args
  17. """
  18. class WorkflowWithMessage:
  19. message_id: str
  20. conversation_id: str
  21. def __init__(self, workflow_run: WorkflowRun):
  22. self._workflow_run = workflow_run
  23. def __getattr__(self, item):
  24. return getattr(self._workflow_run, item)
  25. pagination = self.get_paginate_workflow_runs(app_model, args)
  26. with_message_workflow_runs = []
  27. for workflow_run in pagination.data:
  28. message = workflow_run.message
  29. with_message_workflow_run = WorkflowWithMessage(workflow_run=workflow_run)
  30. if message:
  31. with_message_workflow_run.message_id = message.id
  32. with_message_workflow_run.conversation_id = message.conversation_id
  33. with_message_workflow_runs.append(with_message_workflow_run)
  34. pagination.data = with_message_workflow_runs
  35. return pagination
  36. def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  37. """
  38. Get debug workflow run list
  39. Only return triggered_from == debugging
  40. :param app_model: app model
  41. :param args: request args
  42. """
  43. limit = int(args.get("limit", 20))
  44. base_query = db.session.query(WorkflowRun).filter(
  45. WorkflowRun.tenant_id == app_model.tenant_id,
  46. WorkflowRun.app_id == app_model.id,
  47. WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value,
  48. )
  49. if args.get("last_id"):
  50. last_workflow_run = base_query.filter(
  51. WorkflowRun.id == args.get("last_id"),
  52. ).first()
  53. if not last_workflow_run:
  54. raise ValueError("Last workflow run not exists")
  55. workflow_runs = (
  56. base_query.filter(
  57. WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id
  58. )
  59. .order_by(WorkflowRun.created_at.desc())
  60. .limit(limit)
  61. .all()
  62. )
  63. else:
  64. workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
  65. has_more = False
  66. if len(workflow_runs) == limit:
  67. current_page_first_workflow_run = workflow_runs[-1]
  68. rest_count = base_query.filter(
  69. WorkflowRun.created_at < current_page_first_workflow_run.created_at,
  70. WorkflowRun.id != current_page_first_workflow_run.id,
  71. ).count()
  72. if rest_count > 0:
  73. has_more = True
  74. return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
  75. def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun:
  76. """
  77. Get workflow run detail
  78. :param app_model: app model
  79. :param run_id: workflow run id
  80. """
  81. workflow_run = (
  82. db.session.query(WorkflowRun)
  83. .filter(
  84. WorkflowRun.tenant_id == app_model.tenant_id,
  85. WorkflowRun.app_id == app_model.id,
  86. WorkflowRun.id == run_id,
  87. )
  88. .first()
  89. )
  90. return workflow_run
  91. def get_workflow_run_node_executions(self, app_model: App, run_id: str) -> list[WorkflowNodeExecution]:
  92. """
  93. Get workflow run node execution list
  94. """
  95. workflow_run = self.get_workflow_run(app_model, run_id)
  96. if not workflow_run:
  97. return []
  98. node_executions = (
  99. db.session.query(WorkflowNodeExecution)
  100. .filter(
  101. WorkflowNodeExecution.tenant_id == app_model.tenant_id,
  102. WorkflowNodeExecution.app_id == app_model.id,
  103. WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
  104. WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
  105. WorkflowNodeExecution.workflow_run_id == run_id,
  106. )
  107. .order_by(WorkflowNodeExecution.index.desc())
  108. .all()
  109. )
  110. return node_executions