workflow_app_service.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import uuid
  2. from flask_sqlalchemy.pagination import Pagination
  3. from sqlalchemy import and_, or_
  4. from extensions.ext_database import db
  5. from models import App, EndUser, WorkflowAppLog, WorkflowRun
  6. from models.enums import CreatedByRole
  7. from models.workflow import WorkflowRunStatus
  8. class WorkflowAppService:
  9. def get_paginate_workflow_app_logs(self, app_model: App, args: dict) -> Pagination:
  10. """
  11. Get paginate workflow app logs
  12. :param app: app model
  13. :param args: request args
  14. :return:
  15. """
  16. query = db.select(WorkflowAppLog).where(
  17. WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
  18. )
  19. status = WorkflowRunStatus.value_of(args.get("status", "")) if args.get("status") else None
  20. keyword = args["keyword"]
  21. if keyword or status:
  22. query = query.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
  23. if keyword:
  24. keyword_like_val = f"%{args['keyword'][:30]}%"
  25. keyword_conditions = [
  26. WorkflowRun.inputs.ilike(keyword_like_val),
  27. WorkflowRun.outputs.ilike(keyword_like_val),
  28. # filter keyword by end user session id if created by end user role
  29. and_(WorkflowRun.created_by_role == "end_user", EndUser.session_id.ilike(keyword_like_val)),
  30. ]
  31. # filter keyword by workflow run id
  32. keyword_uuid = self._safe_parse_uuid(keyword)
  33. if keyword_uuid:
  34. keyword_conditions.append(WorkflowRun.id == keyword_uuid)
  35. query = query.outerjoin(
  36. EndUser,
  37. and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER),
  38. ).filter(or_(*keyword_conditions))
  39. if status:
  40. # join with workflow_run and filter by status
  41. query = query.filter(WorkflowRun.status == status.value)
  42. query = query.order_by(WorkflowAppLog.created_at.desc())
  43. pagination = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False)
  44. return pagination
  45. @staticmethod
  46. def _safe_parse_uuid(value: str):
  47. # fast check
  48. if len(value) < 32:
  49. return None
  50. try:
  51. return uuid.UUID(value)
  52. except ValueError:
  53. return None