From 6018013195ade5eadb816a6ad8cd589e9997f58b Mon Sep 17 00:00:00 2001 From: mytharcher Date: Wed, 26 Jan 2022 02:27:23 +0800 Subject: [PATCH] feat(plugin-workflow): execution life cycle with branch and join --- .../{workflow.test.ts => execution.test.ts} | 81 +++++- .../plugin-workflow/src/__tests__/index.ts | 18 ++ .../__tests__/instructions/condition.test.ts | 38 ++- .../src/collections/flow_nodes.ts | 39 ++- packages/plugin-workflow/src/constants.ts | 13 +- .../{condition/index.ts => condition.ts} | 59 +++- .../plugin-workflow/src/instructions/echo.ts | 6 - .../plugin-workflow/src/instructions/index.ts | 31 ++- .../src/instructions/prompt.ts | 14 +- .../plugin-workflow/src/models/Execution.ts | 259 ++++++++++-------- .../plugin-workflow/src/models/Workflow.ts | 4 +- .../plugin-workflow/src/triggers/index.ts | 4 +- .../condition => utils}/calculators.ts | 2 +- .../condition => utils}/getter.ts | 2 +- 14 files changed, 394 insertions(+), 176 deletions(-) rename packages/plugin-workflow/src/__tests__/{workflow.test.ts => execution.test.ts} (74%) rename packages/plugin-workflow/src/instructions/{condition/index.ts => condition.ts} (50%) delete mode 100644 packages/plugin-workflow/src/instructions/echo.ts rename packages/plugin-workflow/src/{instructions/condition => utils}/calculators.ts (94%) rename packages/plugin-workflow/src/{instructions/condition => utils}/getter.ts (95%) diff --git a/packages/plugin-workflow/src/__tests__/workflow.test.ts b/packages/plugin-workflow/src/__tests__/execution.test.ts similarity index 74% rename from packages/plugin-workflow/src/__tests__/workflow.test.ts rename to packages/plugin-workflow/src/__tests__/execution.test.ts index a3a27a2c9..1d9e57ff5 100644 --- a/packages/plugin-workflow/src/__tests__/workflow.test.ts +++ b/packages/plugin-workflow/src/__tests__/execution.test.ts @@ -2,11 +2,11 @@ import { Application } from '@nocobase/server'; import Database, { Model, ModelCtor } from '@nocobase/database'; import { getApp } from '.'; import { WorkflowModel } from '../models/Workflow'; -import { EXECUTION_STATUS, JOB_STATUS } from '../constants'; +import { EXECUTION_STATUS, JOB_STATUS, LINK_TYPE } from '../constants'; jest.setTimeout(300000); -describe('workflow', () => { +describe('execution', () => { let app: Application; let db: Database; let PostModel: ModelCtor; @@ -85,11 +85,13 @@ describe('workflow', () => { type: 'echo' }); - await workflow.createNode({ + const n2 = await workflow.createNode({ title: 'echo 2', type: 'echo', upstream_id: n1.id }); + + await n1.setDownstream(n2); const post = await PostModel.create({ title: 't1' }); @@ -104,7 +106,6 @@ describe('workflow', () => { expect(result).toMatchObject({ data: JSON.parse(JSON.stringify(post.toJSON())) }); }); - // TODO: or should throw error? it('execute resolved workflow', async () => { const workflow = await WorkflowModel.create({ title: 'simple workflow', @@ -115,15 +116,20 @@ describe('workflow', () => { } }); + await workflow.createNode({ + title: 'echo', + type: 'echo' + }); + const post = await PostModel.create({ title: 't1' }); const [execution] = await workflow.getExecutions(); expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); - await execution.exec(123); + expect(execution.start()).rejects.toThrow(); expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); const jobs = await execution.getJobs(); - expect(jobs.length).toEqual(0); + expect(jobs.length).toEqual(1); }); }); @@ -143,12 +149,14 @@ describe('workflow', () => { type: 'prompt', }); - await workflow.createNode({ + const n2 = await workflow.createNode({ title: 'echo', type: 'echo', upstream_id: n1.id }); + await n1.setDownstream(n2); + const post = await PostModel.create({ title: 't1' }); const [execution] = await workflow.getExecutions(); @@ -157,10 +165,11 @@ describe('workflow', () => { expect(pending.status).toEqual(JOB_STATUS.PENDING); expect(pending.result).toEqual(null); - await execution.exec(123, null, {}); + pending.set('result', 123); + await execution.resume(pending); expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); - const jobs = await execution.getJobs(); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); expect(jobs.length).toEqual(2); expect(jobs[0].status).toEqual(JOB_STATUS.RESOLVED); expect(jobs[0].result).toEqual(123); @@ -186,17 +195,17 @@ describe('workflow', () => { // no config means always true }); - await workflow.createNode({ + const n2 = await workflow.createNode({ title: 'true to echo', type: 'echo', - when: true, + linkType: LINK_TYPE.ON_TRUE, upstream_id: n1.id }); await workflow.createNode({ title: 'false to echo', type: 'echo', - when: false, + linkType: LINK_TYPE.ON_FALSE, upstream_id: n1.id }); @@ -205,9 +214,55 @@ describe('workflow', () => { const [execution] = await workflow.getExecutions(); expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); - const jobs = await execution.getJobs(); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); expect(jobs.length).toEqual(2); + expect(jobs[0].node_id).toEqual(n1.id); + expect(jobs[1].node_id).toEqual(n2.id); expect(jobs[1].result).toEqual(true); }); + + it('suspend downstream in condition branch, then go on', async () => { + const workflow = await WorkflowModel.create({ + title: 'condition workflow', + enabled: true, + type: 'afterCreate', + config: { + collection: 'posts' + } + }); + + const n1 = await workflow.createNode({ + title: 'condition', + type: 'condition', + // no config means always true + }); + + const n2 = await workflow.createNode({ + title: 'manual', + type: 'prompt', + linkType: LINK_TYPE.ON_TRUE, + upstream_id: n1.id + }); + + const n3 = await workflow.createNode({ + title: 'echo input value', + type: 'echo', + upstream_id: n1.id + }); + + await n1.setDownstream(n3); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); + + const [pending] = await execution.getJobs({ node_id: n2.id }); + pending.set('result', 123); + await execution.resume(pending); + + const jobs = await execution.getJobs(); + expect(jobs.length).toEqual(3); + }); }); }); diff --git a/packages/plugin-workflow/src/__tests__/index.ts b/packages/plugin-workflow/src/__tests__/index.ts index 9aa44c18e..15e40b047 100644 --- a/packages/plugin-workflow/src/__tests__/index.ts +++ b/packages/plugin-workflow/src/__tests__/index.ts @@ -2,12 +2,30 @@ import path from 'path'; import { MockServer, mockServer } from '@nocobase/test'; import plugin from '../server'; +import { InstructionResult, registerInstruction } from '../instructions'; +import { JOB_STATUS } from '../constants'; export async function getApp(options = {}): Promise { const app = mockServer(options); app.plugin(plugin); + // for test only + registerInstruction('echo', { + run(this, { result }, execution) { + return { + status: JOB_STATUS.RESOLVED, + result + }; + } + }); + + registerInstruction('error', { + run(this, input, execution) { + throw new Error('definite error'); + } + }); + await app.load(); app.db.import({ diff --git a/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts b/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts index d91afdba0..1581bfb12 100644 --- a/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts +++ b/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts @@ -2,7 +2,7 @@ import { Application } from '@nocobase/server'; import Database, { Model, ModelCtor } from '@nocobase/database'; import { getApp } from '..'; import { WorkflowModel } from '../../models/Workflow'; -import { EXECUTION_STATUS, JOB_STATUS } from '../../constants'; +import { EXECUTION_STATUS, JOB_STATUS, LINK_TYPE } from '../../constants'; @@ -22,6 +22,10 @@ describe('workflow > instructions > condition', () => { afterEach(() => db.close()); + describe('config.rejectOnFalse', () => { + + }); + describe('single calculation', () => { it('calculation to true downstream', async () => { const workflow = await WorkflowModel.create({ @@ -36,24 +40,26 @@ describe('workflow > instructions > condition', () => { const n1 = await workflow.createNode({ title: 'condition', type: 'condition', - // (1 === 1): true config: { - calculator: 'equal', - operands: [{ value: 1 }, { value: 1 }] + // (1 === 1): true + calculation: { + calculator: 'equal', + operands: [{ value: 1 }, { value: 1 }] + } } }); - await workflow.createNode({ + const n2 = await workflow.createNode({ title: 'true to echo', type: 'echo', - when: true, + linkType: LINK_TYPE.ON_TRUE, upstream_id: n1.id }); - await workflow.createNode({ + const n3 = await workflow.createNode({ title: 'false to echo', type: 'echo', - when: false, + linkType: LINK_TYPE.ON_FALSE, upstream_id: n1.id }); @@ -80,24 +86,26 @@ describe('workflow > instructions > condition', () => { const n1 = await workflow.createNode({ title: 'condition', type: 'condition', - // (0 === 1): false config: { - calculator: 'equal', - operands: [{ value: 0 }, { value: 1 }] + // (0 === 1): false + calculation: { + calculator: 'equal', + operands: [{ value: 0 }, { value: 1 }] + } } }); await workflow.createNode({ title: 'true to echo', type: 'echo', - when: true, + linkType: LINK_TYPE.ON_TRUE, upstream_id: n1.id }); await workflow.createNode({ title: 'false to echo', type: 'echo', - when: false, + linkType: LINK_TYPE.ON_FALSE, upstream_id: n1.id }); @@ -111,4 +119,8 @@ describe('workflow > instructions > condition', () => { expect(jobs[1].result).toEqual(false); }); }); + + describe('group calculation', () => { + + }); }); diff --git a/packages/plugin-workflow/src/collections/flow_nodes.ts b/packages/plugin-workflow/src/collections/flow_nodes.ts index b6e1a057a..8ebf1f766 100644 --- a/packages/plugin-workflow/src/collections/flow_nodes.ts +++ b/packages/plugin-workflow/src/collections/flow_nodes.ts @@ -1,4 +1,5 @@ import { TableOptions } from '@nocobase/database'; +import { LINK_TYPE } from '../constants'; export default { name: 'flow_nodes', @@ -28,21 +29,46 @@ export default { type: 'belongsTo', target: 'flow_nodes' }, - // only works when upstream node is condition type. + { + interface: 'linkTo', + name: 'branches', + type: 'hasMany', + target: 'flow_nodes', + sourceKey: 'id', + foreignKey: 'upstream_id', + }, + // only works when upstream node is branching type, like condition and parallel. // put here because the design of flow-links model is not really necessary for now. // or it should be put into flow-links model. + // if keeps 1:n relactionship, cannot support cycle flow. { - name: 'when', - type: 'boolean', - // defaultValue: null + interface: 'select', + name: 'linkType', + type: 'smallint', + title: 'Link Type', + dataSource: [ + { label: 'Default', value: LINK_TYPE.DEFAULT }, + { label: 'Branched, on true', value: LINK_TYPE.ON_TRUE }, + { label: 'Branched, on false', value: LINK_TYPE.ON_FALSE }, + { label: 'Branched, no limit', value: LINK_TYPE.NO_LIMIT } + ] + }, + // for reasons: + // 1. redirect type node to solve cycle flow. + // 2. recognize as true next node after branches. + { + interface: 'linkTo', + name: 'downstream', + type: 'belongsTo', + target: 'flow_nodes' }, { interface: 'select', type: 'string', name: 'type', title: '类型', + // TODO: data for test only now dataSource: [ - { label: '无处理', value: 'echo' }, { label: '数据处理', value: 'data' }, { label: '数据查询', value: 'query' }, { label: '等待人工输入', value: 'prompt' }, @@ -53,7 +79,8 @@ export default { interface: 'json', type: 'jsonb', name: 'config', - title: '配置' + title: '配置', + defaultValue: {} } ] } as TableOptions; diff --git a/packages/plugin-workflow/src/constants.ts b/packages/plugin-workflow/src/constants.ts index 4f140c0ea..ecc2ed6fc 100644 --- a/packages/plugin-workflow/src/constants.ts +++ b/packages/plugin-workflow/src/constants.ts @@ -1,11 +1,20 @@ export const EXECUTION_STATUS = { STARTED: 0, RESOLVED: 1, - REJECTED: -1 + REJECTED: -1, + CANCELLED: -2 }; export const JOB_STATUS = { PENDING: 0, RESOLVED: 1, - REJECTED: -1 + REJECTED: -1, + CANCELLED: -2 +}; + +export const LINK_TYPE = { + DEFAULT: null, + ON_TRUE: 1, + ON_FALSE: 0, + NO_LIMIT: -1 }; diff --git a/packages/plugin-workflow/src/instructions/condition/index.ts b/packages/plugin-workflow/src/instructions/condition.ts similarity index 50% rename from packages/plugin-workflow/src/instructions/condition/index.ts rename to packages/plugin-workflow/src/instructions/condition.ts index f2b0be198..832ded4f3 100644 --- a/packages/plugin-workflow/src/instructions/condition/index.ts +++ b/packages/plugin-workflow/src/instructions/condition.ts @@ -15,8 +15,10 @@ // } // } -import { getValue, Operand } from "./getter"; -import { getCalculator } from "./calculators"; +import Sequelize = require('sequelize'); +import { getValue, Operand } from "../utils/getter"; +import { getCalculator } from "../utils/calculators"; +import { JOB_STATUS } from "../constants"; type BaseCalculation = { not?: boolean; @@ -63,10 +65,55 @@ function calculate(config, input, execution) { export default { - manual: false, - async run(this, input, execution) { + async run(this, prevJob, execution) { // TODO(optimize): loading of jobs could be reduced and turned into incrementally in execution - const jobs = await execution.getJobs(); - return calculate(this.config as Calculation, input, execution); + // const jobs = await execution.getJobs(); + const { calculation } = this.config || {}; + const result = calculate(calculation, prevJob, execution); + + if (!result && this.config.rejectOnFalse) { + return { + status: JOB_STATUS.REJECTED, + result + }; + } + + const job = { + status: JOB_STATUS.RESOLVED, + result, + // TODO(optimize): try unify the building of job + node_id: this.id, + upstream_id: prevJob instanceof Sequelize.Model ? prevJob.get('id') : null + }; + + const branchNode = execution.nodes + .find(item => item.upstream === this && item.linkType === Number(result)); + + if (!branchNode) { + return job; + } + + const savedJob = await execution.saveJob(job); + + // return execution.exec(branchNode, savedJob); + const tailJob = await execution.exec(branchNode, savedJob); + + if (tailJob.status === JOB_STATUS.PENDING) { + savedJob.set('status', JOB_STATUS.PENDING); + return savedJob; + } + + return tailJob; + }, + + async resume(this, branchJob, execution) { + if (branchJob.status === JOB_STATUS.RESOLVED) { + const job = execution.findBranchParentJob(branchJob, this); + job.set('status', JOB_STATUS.RESOLVED); + return job; + } + + // pass control to upper scope by ending current scope + return execution.end(this, branchJob); } } diff --git a/packages/plugin-workflow/src/instructions/echo.ts b/packages/plugin-workflow/src/instructions/echo.ts deleted file mode 100644 index bd307bce3..000000000 --- a/packages/plugin-workflow/src/instructions/echo.ts +++ /dev/null @@ -1,6 +0,0 @@ -export default { - manual: false, - run(this, input, context) { - return input; - } -}; diff --git a/packages/plugin-workflow/src/instructions/index.ts b/packages/plugin-workflow/src/instructions/index.ts index 53df2583c..1c9722a54 100644 --- a/packages/plugin-workflow/src/instructions/index.ts +++ b/packages/plugin-workflow/src/instructions/index.ts @@ -3,23 +3,32 @@ import { ModelCtor, Model } from "@nocobase/database"; import { ExecutionModel } from "../models/Execution"; -import echo from './echo'; import prompt from './prompt'; import condition from './condition'; +// import parallel from './parallel'; + +export interface Job { + status: number; + result: unknown; + [key: string]: unknown; +} + +export type InstructionResult = Job | Promise; // what should a instruction do? // - base on input and context, do any calculations or system call (io), and produce a result or pending. -// what should input to be? -// - just use previously output result for convenience? -// what should context to be? -// - could be the workflow execution object (containing context data) -export type Instruction = { - manual: boolean; +export interface Instruction { run( this: ModelCtor, + // what should input to be? + // - just use previously output result for convenience? input: any, + // what should context to be? + // - could be the workflow execution object (containing context data) execution: ModelCtor - ): any + ): InstructionResult; + // for start node in main flow (or branch) to resume when manual sub branch triggered + resume?(): InstructionResult } const registery = new Map(); @@ -28,10 +37,10 @@ export function getInstruction(key: string): Instruction { return registery.get(key); } -export function registerInstruction(key: string, fn: Instruction) { - registery.set(key, fn); +export function registerInstruction(key: string, instruction: any) { + registery.set(key, instruction); } -registerInstruction('echo', echo); registerInstruction('prompt', prompt); registerInstruction('condition', condition); +// registerInstruction('parallel', parallel); diff --git a/packages/plugin-workflow/src/instructions/prompt.ts b/packages/plugin-workflow/src/instructions/prompt.ts index 2d3114675..ec3455eb4 100644 --- a/packages/plugin-workflow/src/instructions/prompt.ts +++ b/packages/plugin-workflow/src/instructions/prompt.ts @@ -1,6 +1,14 @@ +import { JOB_STATUS } from "../constants"; + export default { - manual: true, - run(this, input, context) { - return input; + run(this, input, execution) { + return { + status: JOB_STATUS.PENDING + }; + }, + + resume(this, job, execution) { + job.set('status', JOB_STATUS.RESOLVED); + return job; } } diff --git a/packages/plugin-workflow/src/models/Execution.ts b/packages/plugin-workflow/src/models/Execution.ts index 0a0f5fc89..78ee0f29a 100644 --- a/packages/plugin-workflow/src/models/Execution.ts +++ b/packages/plugin-workflow/src/models/Execution.ts @@ -1,140 +1,175 @@ -import { Model } from '@nocobase/database'; +import Sequelize from 'sequelize'; +import { Model, ModelCtor } from '@nocobase/database'; import { EXECUTION_STATUS, JOB_STATUS } from '../constants'; import { getInstruction } from '../instructions'; export class ExecutionModel extends Model { - async exec(input, previousJob = null, options = {}) { - // check execution status for quick out - if (this.get('status') !== EXECUTION_STATUS.STARTED) { - return; - } + nodes: Array = []; + nodesMap = new Map(); + jobsMap = new Map(); - let lastJob = previousJob || await this.getLastJob(options); - const node = await this.getNextNode(lastJob); - // if not found any node - if (!node) { - // set execution as resolved - await this.update({ - status: EXECUTION_STATUS.RESOLVED - }); + // make dual linked nodes list then cache + makeNodes(nodes = []) { + this.nodes = nodes; - return; - } + nodes.forEach(node => { + this.nodesMap.set(node.id, node); + }); - // got node.id and node.type - // find node instruction by type from registered node types in memory (program defined) - const instruction = getInstruction(node.type); - - let result = null; - let status = JOB_STATUS.PENDING; - // check if manual or node is on current job - if (!instruction.manual || (lastJob && lastJob.node_id === node.id)) { - // execute instruction of next node and get status - try { - result = await instruction.run.call(node, input ?? lastJob?.result, this); - status = JOB_STATUS.RESOLVED; - } catch(err) { - result = err; - status = JOB_STATUS.REJECTED; + nodes.forEach(node => { + if (node.upstream_id) { + node.upstream = this.nodesMap.get(node.upstream_id); } - } - // manually exec pending job - if (lastJob && lastJob.node_id === node.id) { - if (lastJob.status !== JOB_STATUS.PENDING) { - // not allow to retry resolved or rejected job for now - // TODO: based on retry config - return; + if (node.downstream_id) { + node.downstream = this.nodesMap.get(node.downstream_id); } - // RUN instruction - // should update the record based on input - lastJob.update({ - status, - result - }); - } else { - // RUN instruction - lastJob = await this.createJob({ - status, - node_id: node.id, - upstream_id: lastJob ? lastJob.id : null, - // TODO: how to presentation error? - result - }); - } - - switch(status) { - case JOB_STATUS.PENDING: - case JOB_STATUS.REJECTED: - // TODO: should handle rejected when configured - return; - default: - // should return chained promise to run any nodes as many as possible, - // till end (pending/rejected/no more) - return this.exec(result, lastJob, options); - } + }); } - async getLastJob(options) { + makeJobs(jobs: Array>) { + jobs.forEach(job => { + this.jobsMap.set(job.id, job); + }); + } + + async prepare() { + if (this.status !== EXECUTION_STATUS.STARTED) { + throw new Error(`execution was ended with status ${this.status}`); + } + + if (!this.workflow) { + this.workflow = await this.getWorkflow(); + } + + const nodes = await this.workflow.getNodes(); + + this.makeNodes(nodes); + const jobs = await this.getJobs(); - - if (!jobs.length) { - return null; - } - // find last job, last means no any other jobs set upstream to - const lastJobIds = new Set(jobs.map(item => item.id)); - jobs.forEach(item => { - if (item.upstream_id) { - lastJobIds.delete(item.upstream_id); - } - }); - // TODO(feature): - // if has multiple jobs? which one or some should be run next? - // if has determined flowNodeId, run that one. - // else not supported for now (multiple race pendings) - const [jobId] = Array.from(lastJobIds); - return jobs.find(item => item.id === jobId) || null; + this.makeJobs(jobs); } - async getNextNode(lastJob) { - if (!this.get('workflow')) { - // cache workflow - this.setDataValue('workflow', await this.getWorkflow()); + async start(options) { + await this.prepare(); + if (!this.nodes.length) { + return this.exit(null); } - const workflow = this.get('workflow'); + const head = this.nodes.find(item => !item.upstream); + return this.exec(head, { result: this.context }); + } - // if has not any job, means initial execution - if (!lastJob) { - // find first node for this workflow - // first one is the one has no upstream - const [firstNode = null] = await workflow.getNodes({ - where: { - upstream_id: null - } + async resume(job, options) { + await this.prepare(); + const node = this.nodesMap.get(job.node_id); + return this.recall(node, job); + } + + private async run(instruction, node, prevJob) { + let job; + try { + // call instruction to get result and status + job = await instruction.call(node, prevJob, this); + } catch (err) { + console.error(err); + // for uncaught error, set to rejected + job = { + result: err, + status: JOB_STATUS.REJECTED + }; + } + + let savedJob; + if (job instanceof Sequelize.Model) { + savedJob = await job.save(); + } else { + const upstream_id = prevJob instanceof Sequelize.Model ? prevJob.get('id') : null; + savedJob = await this.saveJob({ + node_id: node.id, + upstream_id, + ...job }); - - // put firstNode as next node to be execute - return firstNode; } - const lastNode = await lastJob.getNode(); - - if (lastJob.status === JOB_STATUS.PENDING) { - return lastNode; + if (savedJob.get('status') === JOB_STATUS.RESOLVED && node.downstream) { + // run next node + return this.exec(node.downstream, savedJob); } - const [nextNode = null] = await workflow.getNodes({ - where: { - upstream_id: lastJob.node_id, - // TODO: need better design - ...(lastNode.type === 'condition' ? { - when: lastJob.result - } : {}) - } + // all nodes in scope have been executed + return this.end(node, savedJob); + } + + async exec(node, input?) { + const { run } = getInstruction(node.type); + + return this.run(run, node, input); + } + + // parent node should take over the control + end(node, job) { + const parentNode = this.findBranchParentNode(node); + // no parent, means on main flow + if (parentNode) { + return this.recall(parentNode, job); + } + + // really done for all nodes + // * should mark execution as done with last job status + return this.exit(job); + } + + async recall(node, job) { + const { resume } = getInstruction(node.type); + if (!resume) { + return Promise.reject(new Error('`resume` should be implemented because the node made branch')); + } + + return this.run(resume, node, job); + } + + async exit(job) { + const executionStatusMap = { + [JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED, + [JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED, + [JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED, + [JOB_STATUS.CANCELLED]: EXECUTION_STATUS.CANCELLED, + }; + const status = job ? executionStatusMap[job.status] : EXECUTION_STATUS.RESOLVED; + await this.update({ status }); + return job; + } + + // TODO(optimize) + async saveJob(payload) { + const JobModel = this.database.getModel('jobs'); + const [result] = await JobModel.upsert({ + ...payload, + execution_id: this.id }); - return nextNode; + this.jobsMap.set(result.id, result); + + return result; + } + + findBranchParentNode(node): any { + for (let n = node; n; n = n.upstream) { + if (n.linkType !== null) { + return n.upstream; + } + } + return null; + } + + findBranchParentJob(job, node) { + for (let j = job; j; j = this.jobsMap.get(j.upstream_id)) { + if (j.node_id === node.id) { + return j; + } + } + return null; } } diff --git a/packages/plugin-workflow/src/models/Workflow.ts b/packages/plugin-workflow/src/models/Workflow.ts index 79319d02d..ebc7aef4e 100644 --- a/packages/plugin-workflow/src/models/Workflow.ts +++ b/packages/plugin-workflow/src/models/Workflow.ts @@ -43,7 +43,9 @@ export class WorkflowModel extends Model { status: EXECUTION_STATUS.STARTED }); execution.setDataValue('workflow', this); - await execution.exec(context, null, options); + execution.workflow = this; + + await execution.start(null, null, options); return execution; } } diff --git a/packages/plugin-workflow/src/triggers/index.ts b/packages/plugin-workflow/src/triggers/index.ts index 8b7da7e64..99751e308 100644 --- a/packages/plugin-workflow/src/triggers/index.ts +++ b/packages/plugin-workflow/src/triggers/index.ts @@ -1,7 +1,9 @@ +import { ModelCtor } from '@nocobase/database'; +import { WorkflowModel } from '../models/Workflow'; import * as dataChangeTriggers from './data-change'; export interface ITrigger { - (config: any): void + (this: ModelCtor, config: any): void } const triggers = new Map(); diff --git a/packages/plugin-workflow/src/instructions/condition/calculators.ts b/packages/plugin-workflow/src/utils/calculators.ts similarity index 94% rename from packages/plugin-workflow/src/instructions/condition/calculators.ts rename to packages/plugin-workflow/src/utils/calculators.ts index d15d14a8f..33cfd16fe 100644 --- a/packages/plugin-workflow/src/instructions/condition/calculators.ts +++ b/packages/plugin-workflow/src/utils/calculators.ts @@ -1,4 +1,4 @@ -type Calculator = (...args: any[]) => boolean; +type Calculator = (...args: any[]) => any; const calculators = new Map(); diff --git a/packages/plugin-workflow/src/instructions/condition/getter.ts b/packages/plugin-workflow/src/utils/getter.ts similarity index 95% rename from packages/plugin-workflow/src/instructions/condition/getter.ts rename to packages/plugin-workflow/src/utils/getter.ts index 11969f3ef..77a442291 100644 --- a/packages/plugin-workflow/src/instructions/condition/getter.ts +++ b/packages/plugin-workflow/src/utils/getter.ts @@ -1,7 +1,7 @@ import { get } from 'lodash'; import { ModelCtor } from '@nocobase/database'; -import { ExecutionModel } from '../../models/Execution'; +import { ExecutionModel } from '../models/Execution'; export type OperandType = 'context' | 'input' | 'job';