workflow.py 12 KB


  1. import json
  2. import logging
  3. from flask import abort, request
  4. from flask_restful import Resource, marshal_with, reqparse
  5. from werkzeug.exceptions import InternalServerError, NotFound
  6. import services
  7. from controllers.console import api
  8. from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
  9. from controllers.console.app.wraps import get_app_model
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.app.apps.base_app_queue_manager import AppQueueManager
  13. from core.app.entities.app_invoke_entities import InvokeFrom
  14. from fields.workflow_fields import workflow_fields
  15. from fields.workflow_run_fields import workflow_run_node_execution_fields
  16. from libs import helper
  17. from libs.helper import TimestampField, uuid_value
  18. from libs.login import current_user, login_required
  19. from models.model import App, AppMode
  20. from services.app_generate_service import AppGenerateService
  21. from services.errors.app import WorkflowHashNotEqualError
  22. from services.workflow_service import WorkflowService
  23. logger = logging.getLogger(__name__)
  24. class DraftWorkflowApi(Resource):
  25. @setup_required
  26. @login_required
  27. @account_initialization_required
  28. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  29. @marshal_with(workflow_fields)
  30. def get(self, app_model: App):
  31. """
  32. Get draft workflow
  33. """
  34. # fetch draft workflow by app_model
  35. workflow_service = WorkflowService()
  36. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  37. if not workflow:
  38. raise DraftWorkflowNotExist()
  39. # return workflow, if not found, return None (initiate graph by frontend)
  40. return workflow
  41. @setup_required
  42. @login_required
  43. @account_initialization_required
  44. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  45. def post(self, app_model: App):
  46. """
  47. Sync draft workflow
  48. """
  49. content_type = request.headers.get('Content-Type')
  50. if 'application/json' in content_type:
  51. parser = reqparse.RequestParser()
  52. parser.add_argument('graph', type=dict, required=True, nullable=False, location='json')
  53. parser.add_argument('features', type=dict, required=True, nullable=False, location='json')
  54. parser.add_argument('hash', type=str, required=False, location='json')
  55. args = parser.parse_args()
  56. elif 'text/plain' in content_type:
  57. try:
  58. data = json.loads(request.data.decode('utf-8'))
  59. if 'graph' not in data or 'features' not in data:
  60. raise ValueError('graph or features not found in data')
  61. if not isinstance(data.get('graph'), dict) or not isinstance(data.get('features'), dict):
  62. raise ValueError('graph or features is not a dict')
  63. args = {
  64. 'graph': data.get('graph'),
  65. 'features': data.get('features'),
  66. 'hash': data.get('hash')
  67. }
  68. except json.JSONDecodeError:
  69. return {'message': 'Invalid JSON data'}, 400
  70. else:
  71. abort(415)
  72. workflow_service = WorkflowService()
  73. try:
  74. workflow = workflow_service.sync_draft_workflow(
  75. app_model=app_model,
  76. graph=args.get('graph'),
  77. features=args.get('features'),
  78. unique_hash=args.get('hash'),
  79. account=current_user
  80. )
  81. except WorkflowHashNotEqualError:
  82. raise DraftWorkflowNotSync()
  83. return {
  84. "result": "success",
  85. "hash": workflow.unique_hash,
  86. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at)
  87. }
  88. class AdvancedChatDraftWorkflowRunApi(Resource):
  89. @setup_required
  90. @login_required
  91. @account_initialization_required
  92. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  93. def post(self, app_model: App):
  94. """
  95. Run draft workflow
  96. """
  97. parser = reqparse.RequestParser()
  98. parser.add_argument('inputs', type=dict, location='json')
  99. parser.add_argument('query', type=str, required=True, location='json', default='')
  100. parser.add_argument('files', type=list, location='json')
  101. parser.add_argument('conversation_id', type=uuid_value, location='json')
  102. args = parser.parse_args()
  103. try:
  104. response = AppGenerateService.generate(
  105. app_model=app_model,
  106. user=current_user,
  107. args=args,
  108. invoke_from=InvokeFrom.DEBUGGER,
  109. streaming=True
  110. )
  111. return helper.compact_generate_response(response)
  112. except services.errors.conversation.ConversationNotExistsError:
  113. raise NotFound("Conversation Not Exists.")
  114. except services.errors.conversation.ConversationCompletedError:
  115. raise ConversationCompletedError()
  116. except ValueError as e:
  117. raise e
  118. except Exception as e:
  119. logging.exception("internal server error.")
  120. raise InternalServerError()
  121. class DraftWorkflowRunApi(Resource):
  122. @setup_required
  123. @login_required
  124. @account_initialization_required
  125. @get_app_model(mode=[AppMode.WORKFLOW])
  126. def post(self, app_model: App):
  127. """
  128. Run draft workflow
  129. """
  130. parser = reqparse.RequestParser()
  131. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  132. parser.add_argument('files', type=list, required=False, location='json')
  133. args = parser.parse_args()
  134. try:
  135. response = AppGenerateService.generate(
  136. app_model=app_model,
  137. user=current_user,
  138. args=args,
  139. invoke_from=InvokeFrom.DEBUGGER,
  140. streaming=True
  141. )
  142. return helper.compact_generate_response(response)
  143. except ValueError as e:
  144. raise e
  145. except Exception as e:
  146. logging.exception("internal server error.")
  147. raise InternalServerError()
  148. class WorkflowTaskStopApi(Resource):
  149. @setup_required
  150. @login_required
  151. @account_initialization_required
  152. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  153. def post(self, app_model: App, task_id: str):
  154. """
  155. Stop workflow task
  156. """
  157. AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
  158. return {
  159. "result": "success"
  160. }
  161. class DraftWorkflowNodeRunApi(Resource):
  162. @setup_required
  163. @login_required
  164. @account_initialization_required
  165. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  166. @marshal_with(workflow_run_node_execution_fields)
  167. def post(self, app_model: App, node_id: str):
  168. """
  169. Run draft workflow node
  170. """
  171. parser = reqparse.RequestParser()
  172. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  173. args = parser.parse_args()
  174. workflow_service = WorkflowService()
  175. workflow_node_execution = workflow_service.run_draft_workflow_node(
  176. app_model=app_model,
  177. node_id=node_id,
  178. user_inputs=args.get('inputs'),
  179. account=current_user
  180. )
  181. return workflow_node_execution
  182. class PublishedWorkflowApi(Resource):
  183. @setup_required
  184. @login_required
  185. @account_initialization_required
  186. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  187. @marshal_with(workflow_fields)
  188. def get(self, app_model: App):
  189. """
  190. Get published workflow
  191. """
  192. # fetch published workflow by app_model
  193. workflow_service = WorkflowService()
  194. workflow = workflow_service.get_published_workflow(app_model=app_model)
  195. # return workflow, if not found, return None
  196. return workflow
  197. @setup_required
  198. @login_required
  199. @account_initialization_required
  200. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  201. def post(self, app_model: App):
  202. """
  203. Publish workflow
  204. """
  205. workflow_service = WorkflowService()
  206. workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
  207. return {
  208. "result": "success",
  209. "created_at": TimestampField().format(workflow.created_at)
  210. }
  211. class DefaultBlockConfigsApi(Resource):
  212. @setup_required
  213. @login_required
  214. @account_initialization_required
  215. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  216. def get(self, app_model: App):
  217. """
  218. Get default block config
  219. """
  220. # Get default block configs
  221. workflow_service = WorkflowService()
  222. return workflow_service.get_default_block_configs()
  223. class DefaultBlockConfigApi(Resource):
  224. @setup_required
  225. @login_required
  226. @account_initialization_required
  227. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  228. def get(self, app_model: App, block_type: str):
  229. """
  230. Get default block config
  231. """
  232. parser = reqparse.RequestParser()
  233. parser.add_argument('q', type=str, location='args')
  234. args = parser.parse_args()
  235. filters = None
  236. if args.get('q'):
  237. try:
  238. filters = json.loads(args.get('q'))
  239. except json.JSONDecodeError:
  240. raise ValueError('Invalid filters')
  241. # Get default block configs
  242. workflow_service = WorkflowService()
  243. return workflow_service.get_default_block_config(
  244. node_type=block_type,
  245. filters=filters
  246. )
  247. class ConvertToWorkflowApi(Resource):
  248. @setup_required
  249. @login_required
  250. @account_initialization_required
  251. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  252. def post(self, app_model: App):
  253. """
  254. Convert basic mode of chatbot app to workflow mode
  255. Convert expert mode of chatbot app to workflow mode
  256. Convert Completion App to Workflow App
  257. """
  258. if request.data:
  259. parser = reqparse.RequestParser()
  260. parser.add_argument('name', type=str, required=False, nullable=True, location='json')
  261. parser.add_argument('icon', type=str, required=False, nullable=True, location='json')
  262. parser.add_argument('icon_background', type=str, required=False, nullable=True, location='json')
  263. args = parser.parse_args()
  264. else:
  265. args = {}
  266. # convert to workflow mode
  267. workflow_service = WorkflowService()
  268. new_app_model = workflow_service.convert_to_workflow(
  269. app_model=app_model,
  270. account=current_user,
  271. args=args
  272. )
  273. # return app id
  274. return {
  275. 'new_app_id': new_app_model.id,
  276. }
  277. api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
  278. api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
  279. api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
  280. api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop')
  281. api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
  282. api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/publish')
  283. api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
  284. api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
  285. '/<string:block_type>')
  286. api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')