workflow.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. import json
  2. import logging
  3. from flask import abort, request
  4. from flask_restful import Resource, marshal_with, reqparse
  5. from werkzeug.exceptions import InternalServerError, NotFound
  6. import services
  7. from controllers.console import api
  8. from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist
  9. from controllers.console.app.wraps import get_app_model
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.app.apps.base_app_queue_manager import AppQueueManager
  13. from core.app.entities.app_invoke_entities import InvokeFrom
  14. from fields.workflow_fields import workflow_fields
  15. from fields.workflow_run_fields import workflow_run_node_execution_fields
  16. from libs import helper
  17. from libs.helper import TimestampField, uuid_value
  18. from libs.login import current_user, login_required
  19. from models.model import App, AppMode
  20. from services.app_generate_service import AppGenerateService
  21. from services.workflow_service import WorkflowService
  22. logger = logging.getLogger(__name__)
  23. class DraftWorkflowApi(Resource):
  24. @setup_required
  25. @login_required
  26. @account_initialization_required
  27. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  28. @marshal_with(workflow_fields)
  29. def get(self, app_model: App):
  30. """
  31. Get draft workflow
  32. """
  33. # fetch draft workflow by app_model
  34. workflow_service = WorkflowService()
  35. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  36. if not workflow:
  37. raise DraftWorkflowNotExist()
  38. # return workflow, if not found, return None (initiate graph by frontend)
  39. return workflow
  40. @setup_required
  41. @login_required
  42. @account_initialization_required
  43. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  44. def post(self, app_model: App):
  45. """
  46. Sync draft workflow
  47. """
  48. content_type = request.headers.get('Content-Type')
  49. if 'application/json' in content_type:
  50. parser = reqparse.RequestParser()
  51. parser.add_argument('graph', type=dict, required=True, nullable=False, location='json')
  52. parser.add_argument('features', type=dict, required=True, nullable=False, location='json')
  53. args = parser.parse_args()
  54. elif 'text/plain' in content_type:
  55. try:
  56. data = json.loads(request.data.decode('utf-8'))
  57. if 'graph' not in data or 'features' not in data:
  58. raise ValueError('graph or features not found in data')
  59. if not isinstance(data.get('graph'), dict) or not isinstance(data.get('features'), dict):
  60. raise ValueError('graph or features is not a dict')
  61. args = {
  62. 'graph': data.get('graph'),
  63. 'features': data.get('features')
  64. }
  65. except json.JSONDecodeError:
  66. return {'message': 'Invalid JSON data'}, 400
  67. else:
  68. abort(415)
  69. workflow_service = WorkflowService()
  70. workflow = workflow_service.sync_draft_workflow(
  71. app_model=app_model,
  72. graph=args.get('graph'),
  73. features=args.get('features'),
  74. account=current_user
  75. )
  76. return {
  77. "result": "success",
  78. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at)
  79. }
  80. class AdvancedChatDraftWorkflowRunApi(Resource):
  81. @setup_required
  82. @login_required
  83. @account_initialization_required
  84. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  85. def post(self, app_model: App):
  86. """
  87. Run draft workflow
  88. """
  89. parser = reqparse.RequestParser()
  90. parser.add_argument('inputs', type=dict, location='json')
  91. parser.add_argument('query', type=str, required=True, location='json', default='')
  92. parser.add_argument('files', type=list, location='json')
  93. parser.add_argument('conversation_id', type=uuid_value, location='json')
  94. args = parser.parse_args()
  95. try:
  96. response = AppGenerateService.generate(
  97. app_model=app_model,
  98. user=current_user,
  99. args=args,
  100. invoke_from=InvokeFrom.DEBUGGER,
  101. streaming=True
  102. )
  103. return helper.compact_generate_response(response)
  104. except services.errors.conversation.ConversationNotExistsError:
  105. raise NotFound("Conversation Not Exists.")
  106. except services.errors.conversation.ConversationCompletedError:
  107. raise ConversationCompletedError()
  108. except ValueError as e:
  109. raise e
  110. except Exception as e:
  111. logging.exception("internal server error.")
  112. raise InternalServerError()
  113. class DraftWorkflowRunApi(Resource):
  114. @setup_required
  115. @login_required
  116. @account_initialization_required
  117. @get_app_model(mode=[AppMode.WORKFLOW])
  118. def post(self, app_model: App):
  119. """
  120. Run draft workflow
  121. """
  122. parser = reqparse.RequestParser()
  123. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  124. parser.add_argument('files', type=list, required=False, location='json')
  125. args = parser.parse_args()
  126. try:
  127. response = AppGenerateService.generate(
  128. app_model=app_model,
  129. user=current_user,
  130. args=args,
  131. invoke_from=InvokeFrom.DEBUGGER,
  132. streaming=True
  133. )
  134. return helper.compact_generate_response(response)
  135. except ValueError as e:
  136. raise e
  137. except Exception as e:
  138. logging.exception("internal server error.")
  139. raise InternalServerError()
  140. class WorkflowTaskStopApi(Resource):
  141. @setup_required
  142. @login_required
  143. @account_initialization_required
  144. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  145. def post(self, app_model: App, task_id: str):
  146. """
  147. Stop workflow task
  148. """
  149. AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
  150. return {
  151. "result": "success"
  152. }
  153. class DraftWorkflowNodeRunApi(Resource):
  154. @setup_required
  155. @login_required
  156. @account_initialization_required
  157. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  158. @marshal_with(workflow_run_node_execution_fields)
  159. def post(self, app_model: App, node_id: str):
  160. """
  161. Run draft workflow node
  162. """
  163. parser = reqparse.RequestParser()
  164. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  165. args = parser.parse_args()
  166. workflow_service = WorkflowService()
  167. workflow_node_execution = workflow_service.run_draft_workflow_node(
  168. app_model=app_model,
  169. node_id=node_id,
  170. user_inputs=args.get('inputs'),
  171. account=current_user
  172. )
  173. return workflow_node_execution
  174. class PublishedWorkflowApi(Resource):
  175. @setup_required
  176. @login_required
  177. @account_initialization_required
  178. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  179. @marshal_with(workflow_fields)
  180. def get(self, app_model: App):
  181. """
  182. Get published workflow
  183. """
  184. # fetch published workflow by app_model
  185. workflow_service = WorkflowService()
  186. workflow = workflow_service.get_published_workflow(app_model=app_model)
  187. # return workflow, if not found, return None
  188. return workflow
  189. @setup_required
  190. @login_required
  191. @account_initialization_required
  192. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  193. def post(self, app_model: App):
  194. """
  195. Publish workflow
  196. """
  197. workflow_service = WorkflowService()
  198. workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
  199. return {
  200. "result": "success",
  201. "created_at": TimestampField().format(workflow.created_at)
  202. }
  203. class DefaultBlockConfigsApi(Resource):
  204. @setup_required
  205. @login_required
  206. @account_initialization_required
  207. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  208. def get(self, app_model: App):
  209. """
  210. Get default block config
  211. """
  212. # Get default block configs
  213. workflow_service = WorkflowService()
  214. return workflow_service.get_default_block_configs()
  215. class DefaultBlockConfigApi(Resource):
  216. @setup_required
  217. @login_required
  218. @account_initialization_required
  219. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  220. def get(self, app_model: App, block_type: str):
  221. """
  222. Get default block config
  223. """
  224. parser = reqparse.RequestParser()
  225. parser.add_argument('q', type=str, location='args')
  226. args = parser.parse_args()
  227. filters = None
  228. if args.get('q'):
  229. try:
  230. filters = json.loads(args.get('q'))
  231. except json.JSONDecodeError:
  232. raise ValueError('Invalid filters')
  233. # Get default block configs
  234. workflow_service = WorkflowService()
  235. return workflow_service.get_default_block_config(
  236. node_type=block_type,
  237. filters=filters
  238. )
  239. class ConvertToWorkflowApi(Resource):
  240. @setup_required
  241. @login_required
  242. @account_initialization_required
  243. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  244. def post(self, app_model: App):
  245. """
  246. Convert basic mode of chatbot app to workflow mode
  247. Convert expert mode of chatbot app to workflow mode
  248. Convert Completion App to Workflow App
  249. """
  250. if request.data:
  251. parser = reqparse.RequestParser()
  252. parser.add_argument('name', type=str, required=False, nullable=True, location='json')
  253. parser.add_argument('icon', type=str, required=False, nullable=True, location='json')
  254. parser.add_argument('icon_background', type=str, required=False, nullable=True, location='json')
  255. args = parser.parse_args()
  256. else:
  257. args = {}
  258. # convert to workflow mode
  259. workflow_service = WorkflowService()
  260. new_app_model = workflow_service.convert_to_workflow(
  261. app_model=app_model,
  262. account=current_user,
  263. args=args
  264. )
  265. # return app id
  266. return {
  267. 'new_app_id': new_app_model.id,
  268. }
  269. api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
  270. api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
  271. api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
  272. api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop')
  273. api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
  274. api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/publish')
  275. api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
  276. api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
  277. '/<string:block_type>')
  278. api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')