use-workflow-run.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { v4 as uuidV4 } from 'uuid'
  8. import { usePathname } from 'next/navigation'
  9. import { useWorkflowStore } from '../store'
  10. import { useNodesSyncDraft } from '../hooks'
  11. import {
  12. NodeRunningStatus,
  13. WorkflowRunningStatus,
  14. } from '../types'
  15. import { useWorkflowUpdate } from './use-workflow-interactions'
  16. import { useStore as useAppStore } from '@/app/components/app/store'
  17. import type { IOtherOptions } from '@/service/base'
  18. import { ssePost } from '@/service/base'
  19. import {
  20. fetchPublishedWorkflow,
  21. stopWorkflowRun,
  22. } from '@/service/workflow'
  23. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  24. import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
  25. export const useWorkflowRun = () => {
  26. const store = useStoreApi()
  27. const workflowStore = useWorkflowStore()
  28. const reactflow = useReactFlow()
  29. const featuresStore = useFeaturesStore()
  30. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  31. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  32. const pathname = usePathname()
  33. const handleBackupDraft = useCallback(() => {
  34. const {
  35. getNodes,
  36. edges,
  37. } = store.getState()
  38. const { getViewport } = reactflow
  39. const {
  40. backupDraft,
  41. setBackupDraft,
  42. } = workflowStore.getState()
  43. const { features } = featuresStore!.getState()
  44. if (!backupDraft) {
  45. setBackupDraft({
  46. nodes: getNodes(),
  47. edges,
  48. viewport: getViewport(),
  49. features,
  50. })
  51. doSyncWorkflowDraft()
  52. }
  53. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  54. const handleLoadBackupDraft = useCallback(() => {
  55. const {
  56. backupDraft,
  57. setBackupDraft,
  58. } = workflowStore.getState()
  59. if (backupDraft) {
  60. const {
  61. nodes,
  62. edges,
  63. viewport,
  64. features,
  65. } = backupDraft
  66. handleUpdateWorkflowCanvas({
  67. nodes,
  68. edges,
  69. viewport,
  70. })
  71. featuresStore!.setState({ features })
  72. setBackupDraft(undefined)
  73. }
  74. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  75. const handleRun = useCallback(async (
  76. params: any,
  77. callback?: IOtherOptions,
  78. ) => {
  79. const {
  80. getNodes,
  81. setNodes,
  82. } = store.getState()
  83. const newNodes = produce(getNodes(), (draft) => {
  84. draft.forEach((node) => {
  85. node.data.selected = false
  86. node.data._runningStatus = undefined
  87. })
  88. })
  89. setNodes(newNodes)
  90. await doSyncWorkflowDraft()
  91. const {
  92. onWorkflowStarted,
  93. onWorkflowFinished,
  94. onNodeStarted,
  95. onNodeFinished,
  96. onIterationStart,
  97. onIterationNext,
  98. onIterationFinish,
  99. onError,
  100. ...restCallback
  101. } = callback || {}
  102. workflowStore.setState({ historyWorkflowData: undefined })
  103. const appDetail = useAppStore.getState().appDetail
  104. const workflowContainer = document.getElementById('workflow-container')
  105. const {
  106. clientWidth,
  107. clientHeight,
  108. } = workflowContainer!
  109. let url = ''
  110. if (appDetail?.mode === 'advanced-chat')
  111. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  112. if (appDetail?.mode === 'workflow')
  113. url = `/apps/${appDetail.id}/workflows/draft/run`
  114. let prevNodeId = ''
  115. const {
  116. setWorkflowRunningData,
  117. } = workflowStore.getState()
  118. setWorkflowRunningData({
  119. result: {
  120. status: WorkflowRunningStatus.Running,
  121. },
  122. tracing: [],
  123. resultText: '',
  124. })
  125. let isInIteration = false
  126. let iterationLength = 0
  127. let ttsUrl = ''
  128. let ttsIsPublic = false
  129. if (params.token) {
  130. ttsUrl = '/text-to-audio'
  131. ttsIsPublic = true
  132. }
  133. else if (params.appId) {
  134. if (pathname.search('explore/installed') > -1)
  135. ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
  136. else
  137. ttsUrl = `/apps/${params.appId}/text-to-audio`
  138. }
  139. const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {})
  140. ssePost(
  141. url,
  142. {
  143. body: params,
  144. },
  145. {
  146. onWorkflowStarted: (params) => {
  147. const { task_id, data } = params
  148. const {
  149. workflowRunningData,
  150. setWorkflowRunningData,
  151. } = workflowStore.getState()
  152. const {
  153. edges,
  154. setEdges,
  155. } = store.getState()
  156. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  157. draft.task_id = task_id
  158. draft.result = {
  159. ...draft?.result,
  160. ...data,
  161. status: WorkflowRunningStatus.Running,
  162. }
  163. }))
  164. const newEdges = produce(edges, (draft) => {
  165. draft.forEach((edge) => {
  166. edge.data = {
  167. ...edge.data,
  168. _runned: false,
  169. }
  170. })
  171. })
  172. setEdges(newEdges)
  173. if (onWorkflowStarted)
  174. onWorkflowStarted(params)
  175. },
  176. onWorkflowFinished: (params) => {
  177. const { data } = params
  178. const {
  179. workflowRunningData,
  180. setWorkflowRunningData,
  181. } = workflowStore.getState()
  182. const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
  183. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  184. draft.result = {
  185. ...draft.result,
  186. ...data,
  187. } as any
  188. if (isStringOutput) {
  189. draft.resultTabActive = true
  190. draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
  191. }
  192. }))
  193. prevNodeId = ''
  194. if (onWorkflowFinished)
  195. onWorkflowFinished(params)
  196. },
  197. onError: (params) => {
  198. const {
  199. workflowRunningData,
  200. setWorkflowRunningData,
  201. } = workflowStore.getState()
  202. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  203. draft.result = {
  204. ...draft.result,
  205. status: WorkflowRunningStatus.Failed,
  206. }
  207. }))
  208. if (onError)
  209. onError(params)
  210. },
  211. onNodeStarted: (params) => {
  212. const { data } = params
  213. const {
  214. workflowRunningData,
  215. setWorkflowRunningData,
  216. } = workflowStore.getState()
  217. const {
  218. getNodes,
  219. setNodes,
  220. edges,
  221. setEdges,
  222. transform,
  223. } = store.getState()
  224. if (isInIteration) {
  225. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  226. const tracing = draft.tracing!
  227. const iterations = tracing[tracing.length - 1]
  228. const currIteration = iterations.details![iterations.details!.length - 1]
  229. currIteration.push({
  230. ...data,
  231. status: NodeRunningStatus.Running,
  232. } as any)
  233. }))
  234. }
  235. else {
  236. const nodes = getNodes()
  237. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  238. draft.tracing!.push({
  239. ...data,
  240. status: NodeRunningStatus.Running,
  241. } as any)
  242. }))
  243. const {
  244. setViewport,
  245. } = reactflow
  246. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  247. const currentNode = nodes[currentNodeIndex]
  248. const position = currentNode.position
  249. const zoom = transform[2]
  250. if (!currentNode.parentId) {
  251. setViewport({
  252. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  253. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  254. zoom: transform[2],
  255. })
  256. }
  257. const newNodes = produce(nodes, (draft) => {
  258. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  259. })
  260. setNodes(newNodes)
  261. const newEdges = produce(edges, (draft) => {
  262. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  263. if (edge)
  264. edge.data = { ...edge.data, _runned: true } as any
  265. })
  266. setEdges(newEdges)
  267. }
  268. if (onNodeStarted)
  269. onNodeStarted(params)
  270. },
  271. onNodeFinished: (params) => {
  272. const { data } = params
  273. const {
  274. workflowRunningData,
  275. setWorkflowRunningData,
  276. } = workflowStore.getState()
  277. const {
  278. getNodes,
  279. setNodes,
  280. } = store.getState()
  281. if (isInIteration) {
  282. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  283. const tracing = draft.tracing!
  284. const iterations = tracing[tracing.length - 1]
  285. const currIteration = iterations.details![iterations.details!.length - 1]
  286. const nodeInfo = currIteration[currIteration.length - 1]
  287. currIteration[currIteration.length - 1] = {
  288. ...nodeInfo,
  289. ...data,
  290. status: NodeRunningStatus.Succeeded,
  291. } as any
  292. }))
  293. }
  294. else {
  295. const nodes = getNodes()
  296. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  297. const currentIndex = draft.tracing!.findIndex(trace => trace.node_id === data.node_id)
  298. if (currentIndex > -1 && draft.tracing) {
  299. draft.tracing[currentIndex] = {
  300. ...(draft.tracing[currentIndex].extras
  301. ? { extras: draft.tracing[currentIndex].extras }
  302. : {}),
  303. ...data,
  304. } as any
  305. }
  306. }))
  307. const newNodes = produce(nodes, (draft) => {
  308. const currentNode = draft.find(node => node.id === data.node_id)!
  309. currentNode.data._runningStatus = data.status as any
  310. })
  311. setNodes(newNodes)
  312. prevNodeId = data.node_id
  313. }
  314. if (onNodeFinished)
  315. onNodeFinished(params)
  316. },
  317. onIterationStart: (params) => {
  318. const { data } = params
  319. const {
  320. workflowRunningData,
  321. setWorkflowRunningData,
  322. } = workflowStore.getState()
  323. const {
  324. getNodes,
  325. setNodes,
  326. edges,
  327. setEdges,
  328. transform,
  329. } = store.getState()
  330. const nodes = getNodes()
  331. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  332. draft.tracing!.push({
  333. ...data,
  334. status: NodeRunningStatus.Running,
  335. details: [],
  336. } as any)
  337. }))
  338. isInIteration = true
  339. iterationLength = data.metadata.iterator_length
  340. const {
  341. setViewport,
  342. } = reactflow
  343. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  344. const currentNode = nodes[currentNodeIndex]
  345. const position = currentNode.position
  346. const zoom = transform[2]
  347. if (!currentNode.parentId) {
  348. setViewport({
  349. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  350. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  351. zoom: transform[2],
  352. })
  353. }
  354. const newNodes = produce(nodes, (draft) => {
  355. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  356. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  357. })
  358. setNodes(newNodes)
  359. const newEdges = produce(edges, (draft) => {
  360. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  361. if (edge)
  362. edge.data = { ...edge.data, _runned: true } as any
  363. })
  364. setEdges(newEdges)
  365. if (onIterationStart)
  366. onIterationStart(params)
  367. },
  368. onIterationNext: (params) => {
  369. const {
  370. workflowRunningData,
  371. setWorkflowRunningData,
  372. } = workflowStore.getState()
  373. const { data } = params
  374. const {
  375. getNodes,
  376. setNodes,
  377. } = store.getState()
  378. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  379. const iteration = draft.tracing![draft.tracing!.length - 1]
  380. if (iteration.details!.length >= iterationLength)
  381. return
  382. iteration.details!.push([])
  383. }))
  384. const nodes = getNodes()
  385. const newNodes = produce(nodes, (draft) => {
  386. const currentNode = draft.find(node => node.id === data.node_id)!
  387. currentNode.data._iterationIndex = data.index > 0 ? data.index : 1
  388. })
  389. setNodes(newNodes)
  390. if (onIterationNext)
  391. onIterationNext(params)
  392. },
  393. onIterationFinish: (params) => {
  394. const { data } = params
  395. const {
  396. workflowRunningData,
  397. setWorkflowRunningData,
  398. } = workflowStore.getState()
  399. const {
  400. getNodes,
  401. setNodes,
  402. } = store.getState()
  403. const nodes = getNodes()
  404. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  405. const tracing = draft.tracing!
  406. tracing[tracing.length - 1] = {
  407. ...tracing[tracing.length - 1],
  408. ...data,
  409. status: NodeRunningStatus.Succeeded,
  410. } as any
  411. }))
  412. isInIteration = false
  413. const newNodes = produce(nodes, (draft) => {
  414. const currentNode = draft.find(node => node.id === data.node_id)!
  415. currentNode.data._runningStatus = data.status
  416. })
  417. setNodes(newNodes)
  418. prevNodeId = data.node_id
  419. if (onIterationFinish)
  420. onIterationFinish(params)
  421. },
  422. onTextChunk: (params) => {
  423. const { data: { text } } = params
  424. const {
  425. workflowRunningData,
  426. setWorkflowRunningData,
  427. } = workflowStore.getState()
  428. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  429. draft.resultTabActive = true
  430. draft.resultText += text
  431. }))
  432. },
  433. onTextReplace: (params) => {
  434. const { data: { text } } = params
  435. const {
  436. workflowRunningData,
  437. setWorkflowRunningData,
  438. } = workflowStore.getState()
  439. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  440. draft.resultText = text
  441. }))
  442. },
  443. onTTSChunk: (messageId: string, audio: string, audioType?: string) => {
  444. if (!audio || audio === '')
  445. return
  446. player.playAudioWithAudio(audio, true)
  447. AudioPlayerManager.getInstance().resetMsgId(messageId)
  448. },
  449. onTTSEnd: (messageId: string, audio: string, audioType?: string) => {
  450. player.playAudioWithAudio(audio, false)
  451. },
  452. ...restCallback,
  453. },
  454. )
  455. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  456. const handleStopRun = useCallback((taskId: string) => {
  457. const appId = useAppStore.getState().appDetail?.id
  458. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  459. }, [])
  460. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  461. const appDetail = useAppStore.getState().appDetail
  462. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  463. if (publishedWorkflow) {
  464. const nodes = publishedWorkflow.graph.nodes
  465. const edges = publishedWorkflow.graph.edges
  466. const viewport = publishedWorkflow.graph.viewport!
  467. handleUpdateWorkflowCanvas({
  468. nodes,
  469. edges,
  470. viewport,
  471. })
  472. featuresStore?.setState({ features: publishedWorkflow.features })
  473. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  474. }
  475. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  476. return {
  477. handleBackupDraft,
  478. handleLoadBackupDraft,
  479. handleRun,
  480. handleStopRun,
  481. handleRestoreFromPublishedWorkflow,
  482. }
  483. }