feat(plugin-workflow): execution life cycle with branch and join

This commit is contained in:
mytharcher 2022-01-26 02:27:23 +08:00
parent 1cce3bf164
commit 6018013195
14 changed files with 394 additions and 176 deletions

View File

@ -2,11 +2,11 @@ import { Application } from '@nocobase/server';
import Database, { Model, ModelCtor } from '@nocobase/database';
import { getApp } from '.';
import { WorkflowModel } from '../models/Workflow';
import { EXECUTION_STATUS, JOB_STATUS } from '../constants';
import { EXECUTION_STATUS, JOB_STATUS, LINK_TYPE } from '../constants';
jest.setTimeout(300000);
describe('workflow', () => {
describe('execution', () => {
let app: Application;
let db: Database;
let PostModel: ModelCtor<Model>;
@ -85,12 +85,14 @@ describe('workflow', () => {
type: 'echo'
});
await workflow.createNode({
const n2 = await workflow.createNode({
title: 'echo 2',
type: 'echo',
upstream_id: n1.id
});
await n1.setDownstream(n2);
const post = await PostModel.create({ title: 't1' });
const [execution] = await workflow.getExecutions();
@ -104,7 +106,6 @@ describe('workflow', () => {
expect(result).toMatchObject({ data: JSON.parse(JSON.stringify(post.toJSON())) });
});
// TODO: or should throw error?
it('execute resolved workflow', async () => {
const workflow = await WorkflowModel.create({
title: 'simple workflow',
@ -115,15 +116,20 @@ describe('workflow', () => {
}
});
await workflow.createNode({
title: 'echo',
type: 'echo'
});
const post = await PostModel.create({ title: 't1' });
const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
await execution.exec(123);
expect(execution.start()).rejects.toThrow();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
const jobs = await execution.getJobs();
expect(jobs.length).toEqual(0);
expect(jobs.length).toEqual(1);
});
});
@ -143,12 +149,14 @@ describe('workflow', () => {
type: 'prompt',
});
await workflow.createNode({
const n2 = await workflow.createNode({
title: 'echo',
type: 'echo',
upstream_id: n1.id
});
await n1.setDownstream(n2);
const post = await PostModel.create({ title: 't1' });
const [execution] = await workflow.getExecutions();
@ -157,10 +165,11 @@ describe('workflow', () => {
expect(pending.status).toEqual(JOB_STATUS.PENDING);
expect(pending.result).toEqual(null);
await execution.exec(123, null, {});
pending.set('result', 123);
await execution.resume(pending);
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
const jobs = await execution.getJobs();
const jobs = await execution.getJobs({ order: [['id', 'ASC']] });
expect(jobs.length).toEqual(2);
expect(jobs[0].status).toEqual(JOB_STATUS.RESOLVED);
expect(jobs[0].result).toEqual(123);
@ -186,17 +195,17 @@ describe('workflow', () => {
// no config means always true
});
await workflow.createNode({
const n2 = await workflow.createNode({
title: 'true to echo',
type: 'echo',
when: true,
linkType: LINK_TYPE.ON_TRUE,
upstream_id: n1.id
});
await workflow.createNode({
title: 'false to echo',
type: 'echo',
when: false,
linkType: LINK_TYPE.ON_FALSE,
upstream_id: n1.id
});
@ -205,9 +214,55 @@ describe('workflow', () => {
const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
const jobs = await execution.getJobs();
const jobs = await execution.getJobs({ order: [['id', 'ASC']] });
expect(jobs.length).toEqual(2);
expect(jobs[0].node_id).toEqual(n1.id);
expect(jobs[1].node_id).toEqual(n2.id);
expect(jobs[1].result).toEqual(true);
});
it('suspend downstream in condition branch, then go on', async () => {
const workflow = await WorkflowModel.create({
title: 'condition workflow',
enabled: true,
type: 'afterCreate',
config: {
collection: 'posts'
}
});
const n1 = await workflow.createNode({
title: 'condition',
type: 'condition',
// no config means always true
});
const n2 = await workflow.createNode({
title: 'manual',
type: 'prompt',
linkType: LINK_TYPE.ON_TRUE,
upstream_id: n1.id
});
const n3 = await workflow.createNode({
title: 'echo input value',
type: 'echo',
upstream_id: n1.id
});
await n1.setDownstream(n3);
const post = await PostModel.create({ title: 't1' });
const [execution] = await workflow.getExecutions();
expect(execution.status).toEqual(EXECUTION_STATUS.STARTED);
const [pending] = await execution.getJobs({ node_id: n2.id });
pending.set('result', 123);
await execution.resume(pending);
const jobs = await execution.getJobs();
expect(jobs.length).toEqual(3);
});
});
});

View File

@ -2,12 +2,30 @@ import path from 'path';
import { MockServer, mockServer } from '@nocobase/test';
import plugin from '../server';
import { InstructionResult, registerInstruction } from '../instructions';
import { JOB_STATUS } from '../constants';
export async function getApp(options = {}): Promise<MockServer> {
const app = mockServer(options);
app.plugin(plugin);
// for test only
registerInstruction('echo', {
run(this, { result }, execution) {
return {
status: JOB_STATUS.RESOLVED,
result
};
}
});
registerInstruction('error', {
run(this, input, execution) {
throw new Error('definite error');
}
});
await app.load();
app.db.import({

View File

@ -2,7 +2,7 @@ import { Application } from '@nocobase/server';
import Database, { Model, ModelCtor } from '@nocobase/database';
import { getApp } from '..';
import { WorkflowModel } from '../../models/Workflow';
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
import { EXECUTION_STATUS, JOB_STATUS, LINK_TYPE } from '../../constants';
@ -22,6 +22,10 @@ describe('workflow > instructions > condition', () => {
afterEach(() => db.close());
describe('config.rejectOnFalse', () => {
});
describe('single calculation', () => {
it('calculation to true downstream', async () => {
const workflow = await WorkflowModel.create({
@ -36,24 +40,26 @@ describe('workflow > instructions > condition', () => {
const n1 = await workflow.createNode({
title: 'condition',
type: 'condition',
// (1 === 1): true
config: {
calculator: 'equal',
operands: [{ value: 1 }, { value: 1 }]
// (1 === 1): true
calculation: {
calculator: 'equal',
operands: [{ value: 1 }, { value: 1 }]
}
}
});
await workflow.createNode({
const n2 = await workflow.createNode({
title: 'true to echo',
type: 'echo',
when: true,
linkType: LINK_TYPE.ON_TRUE,
upstream_id: n1.id
});
await workflow.createNode({
const n3 = await workflow.createNode({
title: 'false to echo',
type: 'echo',
when: false,
linkType: LINK_TYPE.ON_FALSE,
upstream_id: n1.id
});
@ -80,24 +86,26 @@ describe('workflow > instructions > condition', () => {
const n1 = await workflow.createNode({
title: 'condition',
type: 'condition',
// (0 === 1): false
config: {
calculator: 'equal',
operands: [{ value: 0 }, { value: 1 }]
// (0 === 1): false
calculation: {
calculator: 'equal',
operands: [{ value: 0 }, { value: 1 }]
}
}
});
await workflow.createNode({
title: 'true to echo',
type: 'echo',
when: true,
linkType: LINK_TYPE.ON_TRUE,
upstream_id: n1.id
});
await workflow.createNode({
title: 'false to echo',
type: 'echo',
when: false,
linkType: LINK_TYPE.ON_FALSE,
upstream_id: n1.id
});
@ -111,4 +119,8 @@ describe('workflow > instructions > condition', () => {
expect(jobs[1].result).toEqual(false);
});
});
describe('group calculation', () => {
});
});

View File

@ -1,4 +1,5 @@
import { TableOptions } from '@nocobase/database';
import { LINK_TYPE } from '../constants';
export default {
name: 'flow_nodes',
@ -28,21 +29,46 @@ export default {
type: 'belongsTo',
target: 'flow_nodes'
},
// only works when upstream node is condition type.
{
interface: 'linkTo',
name: 'branches',
type: 'hasMany',
target: 'flow_nodes',
sourceKey: 'id',
foreignKey: 'upstream_id',
},
// only works when upstream node is branching type, like condition and parallel.
// put here because the design of flow-links model is not really necessary for now.
// or it should be put into flow-links model.
// if keeps 1:n relactionship, cannot support cycle flow.
{
name: 'when',
type: 'boolean',
// defaultValue: null
interface: 'select',
name: 'linkType',
type: 'smallint',
title: 'Link Type',
dataSource: [
{ label: 'Default', value: LINK_TYPE.DEFAULT },
{ label: 'Branched, on true', value: LINK_TYPE.ON_TRUE },
{ label: 'Branched, on false', value: LINK_TYPE.ON_FALSE },
{ label: 'Branched, no limit', value: LINK_TYPE.NO_LIMIT }
]
},
// for reasons:
// 1. redirect type node to solve cycle flow.
// 2. recognize as true next node after branches.
{
interface: 'linkTo',
name: 'downstream',
type: 'belongsTo',
target: 'flow_nodes'
},
{
interface: 'select',
type: 'string',
name: 'type',
title: '类型',
// TODO: data for test only now
dataSource: [
{ label: '无处理', value: 'echo' },
{ label: '数据处理', value: 'data' },
{ label: '数据查询', value: 'query' },
{ label: '等待人工输入', value: 'prompt' },
@ -53,7 +79,8 @@ export default {
interface: 'json',
type: 'jsonb',
name: 'config',
title: '配置'
title: '配置',
defaultValue: {}
}
]
} as TableOptions;

View File

@ -1,11 +1,20 @@
export const EXECUTION_STATUS = {
STARTED: 0,
RESOLVED: 1,
REJECTED: -1
REJECTED: -1,
CANCELLED: -2
};
export const JOB_STATUS = {
PENDING: 0,
RESOLVED: 1,
REJECTED: -1
REJECTED: -1,
CANCELLED: -2
};
export const LINK_TYPE = {
DEFAULT: null,
ON_TRUE: 1,
ON_FALSE: 0,
NO_LIMIT: -1
};

View File

@ -15,8 +15,10 @@
// }
// }
import { getValue, Operand } from "./getter";
import { getCalculator } from "./calculators";
import Sequelize = require('sequelize');
import { getValue, Operand } from "../utils/getter";
import { getCalculator } from "../utils/calculators";
import { JOB_STATUS } from "../constants";
type BaseCalculation = {
not?: boolean;
@ -63,10 +65,55 @@ function calculate(config, input, execution) {
export default {
manual: false,
async run(this, input, execution) {
async run(this, prevJob, execution) {
// TODO(optimize): loading of jobs could be reduced and turned into incrementally in execution
const jobs = await execution.getJobs();
return calculate(this.config as Calculation, input, execution);
// const jobs = await execution.getJobs();
const { calculation } = this.config || {};
const result = calculate(calculation, prevJob, execution);
if (!result && this.config.rejectOnFalse) {
return {
status: JOB_STATUS.REJECTED,
result
};
}
const job = {
status: JOB_STATUS.RESOLVED,
result,
// TODO(optimize): try unify the building of job
node_id: this.id,
upstream_id: prevJob instanceof Sequelize.Model ? prevJob.get('id') : null
};
const branchNode = execution.nodes
.find(item => item.upstream === this && item.linkType === Number(result));
if (!branchNode) {
return job;
}
const savedJob = await execution.saveJob(job);
// return execution.exec(branchNode, savedJob);
const tailJob = await execution.exec(branchNode, savedJob);
if (tailJob.status === JOB_STATUS.PENDING) {
savedJob.set('status', JOB_STATUS.PENDING);
return savedJob;
}
return tailJob;
},
async resume(this, branchJob, execution) {
if (branchJob.status === JOB_STATUS.RESOLVED) {
const job = execution.findBranchParentJob(branchJob, this);
job.set('status', JOB_STATUS.RESOLVED);
return job;
}
// pass control to upper scope by ending current scope
return execution.end(this, branchJob);
}
}

View File

@ -1,6 +0,0 @@
export default {
manual: false,
run(this, input, context) {
return input;
}
};

View File

@ -3,23 +3,32 @@
import { ModelCtor, Model } from "@nocobase/database";
import { ExecutionModel } from "../models/Execution";
import echo from './echo';
import prompt from './prompt';
import condition from './condition';
// import parallel from './parallel';
export interface Job {
status: number;
result: unknown;
[key: string]: unknown;
}
export type InstructionResult = Job | Promise<Job>;
// what should a instruction do?
// - base on input and context, do any calculations or system call (io), and produce a result or pending.
// what should input to be?
// - just use previously output result for convenience?
// what should context to be?
// - could be the workflow execution object (containing context data)
export type Instruction = {
manual: boolean;
export interface Instruction {
run(
this: ModelCtor<Model>,
// what should input to be?
// - just use previously output result for convenience?
input: any,
// what should context to be?
// - could be the workflow execution object (containing context data)
execution: ModelCtor<ExecutionModel>
): any
): InstructionResult;
// for start node in main flow (or branch) to resume when manual sub branch triggered
resume?(): InstructionResult
}
const registery = new Map<string, Instruction>();
@ -28,10 +37,10 @@ export function getInstruction(key: string): Instruction {
return registery.get(key);
}
export function registerInstruction(key: string, fn: Instruction) {
registery.set(key, fn);
export function registerInstruction(key: string, instruction: any) {
registery.set(key, instruction);
}
registerInstruction('echo', echo);
registerInstruction('prompt', prompt);
registerInstruction('condition', condition);
// registerInstruction('parallel', parallel);

View File

@ -1,6 +1,14 @@
import { JOB_STATUS } from "../constants";
export default {
manual: true,
run(this, input, context) {
return input;
run(this, input, execution) {
return {
status: JOB_STATUS.PENDING
};
},
resume(this, job, execution) {
job.set('status', JOB_STATUS.RESOLVED);
return job;
}
}

View File

@ -1,140 +1,175 @@
import { Model } from '@nocobase/database';
import Sequelize from 'sequelize';
import { Model, ModelCtor } from '@nocobase/database';
import { EXECUTION_STATUS, JOB_STATUS } from '../constants';
import { getInstruction } from '../instructions';
export class ExecutionModel extends Model {
async exec(input, previousJob = null, options = {}) {
// check execution status for quick out
if (this.get('status') !== EXECUTION_STATUS.STARTED) {
return;
}
nodes: Array<any> = [];
nodesMap = new Map();
jobsMap = new Map();
let lastJob = previousJob || await this.getLastJob(options);
const node = await this.getNextNode(lastJob);
// if not found any node
if (!node) {
// set execution as resolved
await this.update({
status: EXECUTION_STATUS.RESOLVED
});
// make dual linked nodes list then cache
makeNodes(nodes = []) {
this.nodes = nodes;
return;
}
nodes.forEach(node => {
this.nodesMap.set(node.id, node);
});
// got node.id and node.type
// find node instruction by type from registered node types in memory (program defined)
const instruction = getInstruction(node.type);
let result = null;
let status = JOB_STATUS.PENDING;
// check if manual or node is on current job
if (!instruction.manual || (lastJob && lastJob.node_id === node.id)) {
// execute instruction of next node and get status
try {
result = await instruction.run.call(node, input ?? lastJob?.result, this);
status = JOB_STATUS.RESOLVED;
} catch(err) {
result = err;
status = JOB_STATUS.REJECTED;
nodes.forEach(node => {
if (node.upstream_id) {
node.upstream = this.nodesMap.get(node.upstream_id);
}
}
// manually exec pending job
if (lastJob && lastJob.node_id === node.id) {
if (lastJob.status !== JOB_STATUS.PENDING) {
// not allow to retry resolved or rejected job for now
// TODO: based on retry config
return;
if (node.downstream_id) {
node.downstream = this.nodesMap.get(node.downstream_id);
}
// RUN instruction
// should update the record based on input
lastJob.update({
status,
result
});
} else {
// RUN instruction
lastJob = await this.createJob({
status,
node_id: node.id,
upstream_id: lastJob ? lastJob.id : null,
// TODO: how to presentation error?
result
});
}
switch(status) {
case JOB_STATUS.PENDING:
case JOB_STATUS.REJECTED:
// TODO: should handle rejected when configured
return;
default:
// should return chained promise to run any nodes as many as possible,
// till end (pending/rejected/no more)
return this.exec(result, lastJob, options);
}
});
}
async getLastJob(options) {
makeJobs(jobs: Array<ModelCtor<Model>>) {
jobs.forEach(job => {
this.jobsMap.set(job.id, job);
});
}
async prepare() {
if (this.status !== EXECUTION_STATUS.STARTED) {
throw new Error(`execution was ended with status ${this.status}`);
}
if (!this.workflow) {
this.workflow = await this.getWorkflow();
}
const nodes = await this.workflow.getNodes();
this.makeNodes(nodes);
const jobs = await this.getJobs();
if (!jobs.length) {
return null;
}
// find last job, last means no any other jobs set upstream to
const lastJobIds = new Set(jobs.map(item => item.id));
jobs.forEach(item => {
if (item.upstream_id) {
lastJobIds.delete(item.upstream_id);
}
});
// TODO(feature):
// if has multiple jobs? which one or some should be run next?
// if has determined flowNodeId, run that one.
// else not supported for now (multiple race pendings)
const [jobId] = Array.from(lastJobIds);
return jobs.find(item => item.id === jobId) || null;
this.makeJobs(jobs);
}
async getNextNode(lastJob) {
if (!this.get('workflow')) {
// cache workflow
this.setDataValue('workflow', await this.getWorkflow());
async start(options) {
await this.prepare();
if (!this.nodes.length) {
return this.exit(null);
}
const workflow = this.get('workflow');
const head = this.nodes.find(item => !item.upstream);
return this.exec(head, { result: this.context });
}
// if has not any job, means initial execution
if (!lastJob) {
// find first node for this workflow
// first one is the one has no upstream
const [firstNode = null] = await workflow.getNodes({
where: {
upstream_id: null
}
async resume(job, options) {
await this.prepare();
const node = this.nodesMap.get(job.node_id);
return this.recall(node, job);
}
private async run(instruction, node, prevJob) {
let job;
try {
// call instruction to get result and status
job = await instruction.call(node, prevJob, this);
} catch (err) {
console.error(err);
// for uncaught error, set to rejected
job = {
result: err,
status: JOB_STATUS.REJECTED
};
}
let savedJob;
if (job instanceof Sequelize.Model) {
savedJob = await job.save();
} else {
const upstream_id = prevJob instanceof Sequelize.Model ? prevJob.get('id') : null;
savedJob = await this.saveJob({
node_id: node.id,
upstream_id,
...job
});
// put firstNode as next node to be execute
return firstNode;
}
const lastNode = await lastJob.getNode();
if (lastJob.status === JOB_STATUS.PENDING) {
return lastNode;
if (savedJob.get('status') === JOB_STATUS.RESOLVED && node.downstream) {
// run next node
return this.exec(node.downstream, savedJob);
}
const [nextNode = null] = await workflow.getNodes({
where: {
upstream_id: lastJob.node_id,
// TODO: need better design
...(lastNode.type === 'condition' ? {
when: lastJob.result
} : {})
}
// all nodes in scope have been executed
return this.end(node, savedJob);
}
async exec(node, input?) {
const { run } = getInstruction(node.type);
return this.run(run, node, input);
}
// parent node should take over the control
end(node, job) {
const parentNode = this.findBranchParentNode(node);
// no parent, means on main flow
if (parentNode) {
return this.recall(parentNode, job);
}
// really done for all nodes
// * should mark execution as done with last job status
return this.exit(job);
}
async recall(node, job) {
const { resume } = getInstruction(node.type);
if (!resume) {
return Promise.reject(new Error('`resume` should be implemented because the node made branch'));
}
return this.run(resume, node, job);
}
async exit(job) {
const executionStatusMap = {
[JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED,
[JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED,
[JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED,
[JOB_STATUS.CANCELLED]: EXECUTION_STATUS.CANCELLED,
};
const status = job ? executionStatusMap[job.status] : EXECUTION_STATUS.RESOLVED;
await this.update({ status });
return job;
}
// TODO(optimize)
async saveJob(payload) {
const JobModel = this.database.getModel('jobs');
const [result] = await JobModel.upsert({
...payload,
execution_id: this.id
});
return nextNode;
this.jobsMap.set(result.id, result);
return result;
}
findBranchParentNode(node): any {
for (let n = node; n; n = n.upstream) {
if (n.linkType !== null) {
return n.upstream;
}
}
return null;
}
findBranchParentJob(job, node) {
for (let j = job; j; j = this.jobsMap.get(j.upstream_id)) {
if (j.node_id === node.id) {
return j;
}
}
return null;
}
}

View File

@ -43,7 +43,9 @@ export class WorkflowModel extends Model {
status: EXECUTION_STATUS.STARTED
});
execution.setDataValue('workflow', this);
await execution.exec(context, null, options);
execution.workflow = this;
await execution.start(null, null, options);
return execution;
}
}

View File

@ -1,7 +1,9 @@
import { ModelCtor } from '@nocobase/database';
import { WorkflowModel } from '../models/Workflow';
import * as dataChangeTriggers from './data-change';
export interface ITrigger {
(config: any): void
(this: ModelCtor<WorkflowModel>, config: any): void
}
const triggers = new Map<string, ITrigger>();

View File

@ -1,4 +1,4 @@
type Calculator = (...args: any[]) => boolean;
type Calculator = (...args: any[]) => any;
const calculators = new Map<string, Calculator>();

View File

@ -1,7 +1,7 @@
import { get } from 'lodash';
import { ModelCtor } from '@nocobase/database';
import { ExecutionModel } from '../../models/Execution';
import { ExecutionModel } from '../models/Execution';
export type OperandType = 'context' | 'input' | 'job';