mirror of
https://gitee.com/nocobase/nocobase.git
synced 2024-12-03 04:38:15 +08:00
feat: server mvp for configurable workflow with nodes
This commit is contained in:
parent
d0b6efaaf5
commit
1cce3bf164
7
packages/plugin-workflow/.npmignore
Normal file
7
packages/plugin-workflow/.npmignore
Normal file
@ -0,0 +1,7 @@
|
||||
node_modules
|
||||
*.log
|
||||
docs
|
||||
__tests__
|
||||
tsconfig.json
|
||||
src
|
||||
.fatherrc.ts
|
16
packages/plugin-workflow/package.json
Normal file
16
packages/plugin-workflow/package.json
Normal file
@ -0,0 +1,16 @@
|
||||
{
|
||||
"name": "@nocobase/plugin-workflow",
|
||||
"version": "0.5.0-alpha.37",
|
||||
"main": "lib/index.js",
|
||||
"private": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@nocobase/server": "^0.5.0-alpha.37",
|
||||
"json-templates": "^4.1.0",
|
||||
"node-schedule": "^2.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node-schedule": "^1.3.1"
|
||||
},
|
||||
"gitHead": "f0b335ac30f29f25c95d7d137655fa64d8d67f1e"
|
||||
}
|
15
packages/plugin-workflow/src/__tests__/collections/posts.ts
Normal file
15
packages/plugin-workflow/src/__tests__/collections/posts.ts
Normal file
@ -0,0 +1,15 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'posts',
|
||||
fields: [
|
||||
{
|
||||
type: 'string',
|
||||
name: 'title',
|
||||
},
|
||||
{
|
||||
type: 'boolean',
|
||||
name: 'published',
|
||||
}
|
||||
]
|
||||
} as TableOptions;
|
@ -0,0 +1,15 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'targets',
|
||||
fields: [
|
||||
{
|
||||
type: 'string',
|
||||
name: 'col1',
|
||||
},
|
||||
{
|
||||
type: 'string',
|
||||
name: 'col2',
|
||||
}
|
||||
],
|
||||
} as TableOptions;
|
26
packages/plugin-workflow/src/__tests__/index.ts
Normal file
26
packages/plugin-workflow/src/__tests__/index.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import path from 'path';
|
||||
import { MockServer, mockServer } from '@nocobase/test';
|
||||
|
||||
import plugin from '../server';
|
||||
|
||||
export async function getApp(options = {}): Promise<MockServer> {
|
||||
const app = mockServer(options);
|
||||
|
||||
app.plugin(plugin);
|
||||
|
||||
await app.load();
|
||||
|
||||
app.db.import({
|
||||
directory: path.resolve(__dirname, './collections')
|
||||
});
|
||||
|
||||
try {
|
||||
await app.db.sync();
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
// TODO: need a better life cycle event than manually trigger
|
||||
await app.emitAsync('beforeStart');
|
||||
|
||||
return app;
|
||||
}
|
@ -0,0 +1,114 @@
|
||||
import { Application } from '@nocobase/server';
|
||||
import Database, { Model, ModelCtor } from '@nocobase/database';
|
||||
import { getApp } from '..';
|
||||
import { WorkflowModel } from '../../models/Workflow';
|
||||
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
|
||||
|
||||
|
||||
|
||||
describe('workflow > instructions > condition', () => {
|
||||
let app: Application;
|
||||
let db: Database;
|
||||
let PostModel: ModelCtor<Model>;
|
||||
let WorkflowModel: ModelCtor<WorkflowModel>;
|
||||
|
||||
beforeEach(async () => {
|
||||
app = await getApp();
|
||||
|
||||
db = app.db;
|
||||
WorkflowModel = db.getModel('workflows') as any;
|
||||
PostModel = db.getModel('posts');
|
||||
});
|
||||
|
||||
afterEach(() => db.close());
|
||||
|
||||
describe('single calculation', () => {
|
||||
it('calculation to true downstream', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'condition workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const n1 = await workflow.createNode({
|
||||
title: 'condition',
|
||||
type: 'condition',
|
||||
// (1 === 1): true
|
||||
config: {
|
||||
calculator: 'equal',
|
||||
operands: [{ value: 1 }, { value: 1 }]
|
||||
}
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'true to echo',
|
||||
type: 'echo',
|
||||
when: true,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'false to echo',
|
||||
type: 'echo',
|
||||
when: false,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(2);
|
||||
expect(jobs[1].result).toEqual(true);
|
||||
});
|
||||
|
||||
it('calculation to false downstream', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'condition workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const n1 = await workflow.createNode({
|
||||
title: 'condition',
|
||||
type: 'condition',
|
||||
// (0 === 1): false
|
||||
config: {
|
||||
calculator: 'equal',
|
||||
operands: [{ value: 0 }, { value: 1 }]
|
||||
}
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'true to echo',
|
||||
type: 'echo',
|
||||
when: true,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'false to echo',
|
||||
type: 'echo',
|
||||
when: false,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(2);
|
||||
expect(jobs[1].result).toEqual(false);
|
||||
});
|
||||
});
|
||||
});
|
213
packages/plugin-workflow/src/__tests__/workflow.test.ts
Normal file
213
packages/plugin-workflow/src/__tests__/workflow.test.ts
Normal file
@ -0,0 +1,213 @@
|
||||
import { Application } from '@nocobase/server';
|
||||
import Database, { Model, ModelCtor } from '@nocobase/database';
|
||||
import { getApp } from '.';
|
||||
import { WorkflowModel } from '../models/Workflow';
|
||||
import { EXECUTION_STATUS, JOB_STATUS } from '../constants';
|
||||
|
||||
jest.setTimeout(300000);
|
||||
|
||||
describe('workflow', () => {
|
||||
let app: Application;
|
||||
let db: Database;
|
||||
let PostModel: ModelCtor<Model>;
|
||||
// let Target: ModelCtor<Model>;
|
||||
let WorkflowModel: ModelCtor<WorkflowModel>;
|
||||
|
||||
beforeEach(async () => {
|
||||
app = await getApp();
|
||||
|
||||
db = app.db;
|
||||
WorkflowModel = db.getModel('workflows') as any;
|
||||
PostModel = db.getModel('posts');
|
||||
// Target = db.getModel('targets');
|
||||
});
|
||||
|
||||
afterEach(() => db.close());
|
||||
|
||||
describe('base', () => {
|
||||
it('empty workflow without any nodes', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'empty workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.context.data.title).toEqual(post.title);
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
});
|
||||
|
||||
it('workflow with single simple node', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'simple workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'echo',
|
||||
type: 'echo'
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.context.data.title).toEqual(post.title);
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(1);
|
||||
const { status, result } = jobs[0].get();
|
||||
expect(status).toEqual(JOB_STATUS.RESOLVED);
|
||||
expect(result).toMatchObject({ data: JSON.parse(JSON.stringify(post.toJSON())) });
|
||||
});
|
||||
|
||||
it('workflow with multiple simple nodes', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'simple workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const n1 = await workflow.createNode({
|
||||
title: 'echo 1',
|
||||
type: 'echo'
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'echo 2',
|
||||
type: 'echo',
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.context.data.title).toEqual(post.title);
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(2);
|
||||
const { status, result } = jobs[1].get();
|
||||
expect(status).toEqual(JOB_STATUS.RESOLVED);
|
||||
expect(result).toMatchObject({ data: JSON.parse(JSON.stringify(post.toJSON())) });
|
||||
});
|
||||
|
||||
// TODO: or should throw error?
|
||||
it('execute resolved workflow', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'simple workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
await execution.exec(123);
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('manual nodes', () => {
|
||||
it('manual node should pause execution, and could be manually resume', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'manual workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const n1 = await workflow.createNode({
|
||||
title: 'prompt',
|
||||
type: 'prompt',
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'echo',
|
||||
type: 'echo',
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.STARTED);
|
||||
const [pending] = await execution.getJobs();
|
||||
expect(pending.status).toEqual(JOB_STATUS.PENDING);
|
||||
expect(pending.result).toEqual(null);
|
||||
|
||||
await execution.exec(123, null, {});
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(2);
|
||||
expect(jobs[0].status).toEqual(JOB_STATUS.RESOLVED);
|
||||
expect(jobs[0].result).toEqual(123);
|
||||
expect(jobs[1].status).toEqual(JOB_STATUS.RESOLVED);
|
||||
expect(jobs[1].result).toEqual(123);
|
||||
});
|
||||
});
|
||||
|
||||
describe('condition node', () => {
|
||||
it('condition node link to different downstreams', async () => {
|
||||
const workflow = await WorkflowModel.create({
|
||||
title: 'condition workflow',
|
||||
enabled: true,
|
||||
type: 'afterCreate',
|
||||
config: {
|
||||
collection: 'posts'
|
||||
}
|
||||
});
|
||||
|
||||
const n1 = await workflow.createNode({
|
||||
title: 'condition',
|
||||
type: 'condition',
|
||||
// no config means always true
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'true to echo',
|
||||
type: 'echo',
|
||||
when: true,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
await workflow.createNode({
|
||||
title: 'false to echo',
|
||||
type: 'echo',
|
||||
when: false,
|
||||
upstream_id: n1.id
|
||||
});
|
||||
|
||||
const post = await PostModel.create({ title: 't1' });
|
||||
|
||||
const [execution] = await workflow.getExecutions();
|
||||
expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
|
||||
const jobs = await execution.getJobs();
|
||||
expect(jobs.length).toEqual(2);
|
||||
expect(jobs[1].result).toEqual(true);
|
||||
});
|
||||
});
|
||||
});
|
33
packages/plugin-workflow/src/collections/executions.ts
Normal file
33
packages/plugin-workflow/src/collections/executions.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'executions',
|
||||
model: 'ExecutionModel',
|
||||
title: '执行流程',
|
||||
fields: [
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'belongsTo',
|
||||
name: 'workflow',
|
||||
title: '所属工作流'
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'hasMany',
|
||||
name: 'jobs',
|
||||
title: '流程记录'
|
||||
},
|
||||
{
|
||||
interface: 'json',
|
||||
type: 'jsonb',
|
||||
name: 'context',
|
||||
title: '上下文数据'
|
||||
},
|
||||
{
|
||||
interface: 'select',
|
||||
type: 'integer',
|
||||
name: 'status',
|
||||
title: '状态'
|
||||
}
|
||||
]
|
||||
} as TableOptions;
|
59
packages/plugin-workflow/src/collections/flow_nodes.ts
Normal file
59
packages/plugin-workflow/src/collections/flow_nodes.ts
Normal file
@ -0,0 +1,59 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'flow_nodes',
|
||||
// model: 'FlowNodeModel',
|
||||
title: 'Workflow Nodes',
|
||||
fields: [
|
||||
{
|
||||
interface: 'string',
|
||||
type: 'string',
|
||||
name: 'title',
|
||||
title: '名称',
|
||||
component: {
|
||||
showInTable: true,
|
||||
showInDetail: true,
|
||||
showInForm: true,
|
||||
},
|
||||
},
|
||||
// which workflow belongs to
|
||||
{
|
||||
interface: 'linkTo',
|
||||
name: 'workflow',
|
||||
type: 'belongsTo',
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
name: 'upstream',
|
||||
type: 'belongsTo',
|
||||
target: 'flow_nodes'
|
||||
},
|
||||
// only works when upstream node is condition type.
|
||||
// put here because the design of flow-links model is not really necessary for now.
|
||||
// or it should be put into flow-links model.
|
||||
{
|
||||
name: 'when',
|
||||
type: 'boolean',
|
||||
// defaultValue: null
|
||||
},
|
||||
{
|
||||
interface: 'select',
|
||||
type: 'string',
|
||||
name: 'type',
|
||||
title: '类型',
|
||||
dataSource: [
|
||||
{ label: '无处理', value: 'echo' },
|
||||
{ label: '数据处理', value: 'data' },
|
||||
{ label: '数据查询', value: 'query' },
|
||||
{ label: '等待人工输入', value: 'prompt' },
|
||||
{ label: '条件判断', value: 'condition' },
|
||||
]
|
||||
},
|
||||
{
|
||||
interface: 'json',
|
||||
type: 'jsonb',
|
||||
name: 'config',
|
||||
title: '配置'
|
||||
}
|
||||
]
|
||||
} as TableOptions;
|
48
packages/plugin-workflow/src/collections/jobs.ts
Normal file
48
packages/plugin-workflow/src/collections/jobs.ts
Normal file
@ -0,0 +1,48 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'jobs',
|
||||
title: '流程记录',
|
||||
fields: [
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'belongsTo',
|
||||
name: 'execution',
|
||||
title: '所属流程'
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'belongsTo',
|
||||
name: 'node',
|
||||
target: 'flow_nodes',
|
||||
title: '所属节点'
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'belongsTo',
|
||||
name: 'upstream',
|
||||
target: 'jobs',
|
||||
title: '上游记录'
|
||||
},
|
||||
// pending / resolved / rejected
|
||||
{
|
||||
interface: 'status',
|
||||
type: 'integer',
|
||||
name: 'status',
|
||||
title: '处理状态'
|
||||
},
|
||||
{
|
||||
interface: 'json',
|
||||
type: 'jsonb',
|
||||
name: 'result',
|
||||
title: '处理结果'
|
||||
},
|
||||
// TODO: possibly need node snapshot in case if node has been changed
|
||||
// {
|
||||
// interface: 'json',
|
||||
// type: 'jsonb',
|
||||
// name: 'nodeSnapshot',
|
||||
// title: 'node snapshot'
|
||||
// }
|
||||
]
|
||||
} as TableOptions;
|
56
packages/plugin-workflow/src/collections/workflows.ts
Normal file
56
packages/plugin-workflow/src/collections/workflows.ts
Normal file
@ -0,0 +1,56 @@
|
||||
import { TableOptions } from '@nocobase/database';
|
||||
|
||||
export default {
|
||||
name: 'workflows',
|
||||
model: 'WorkflowModel',
|
||||
title: '自动化',
|
||||
fields: [
|
||||
{
|
||||
interface: 'string',
|
||||
type: 'string',
|
||||
name: 'title',
|
||||
title: '自动化名称',
|
||||
required: true
|
||||
},
|
||||
{
|
||||
interface: 'boolean',
|
||||
type: 'boolean',
|
||||
name: 'enabled',
|
||||
title: '启用'
|
||||
},
|
||||
{
|
||||
interface: 'textarea',
|
||||
type: 'text',
|
||||
name: 'description',
|
||||
title: '描述'
|
||||
},
|
||||
{
|
||||
interface: 'select',
|
||||
type: 'string',
|
||||
title: '触发方式',
|
||||
name: 'type',
|
||||
required: true
|
||||
},
|
||||
{
|
||||
interface: 'json',
|
||||
type: 'jsonb',
|
||||
title: '触发配置',
|
||||
name: 'config',
|
||||
required: true
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'hasMany',
|
||||
name: 'nodes',
|
||||
target: 'flow_nodes',
|
||||
title: '流程节点'
|
||||
},
|
||||
{
|
||||
interface: 'linkTo',
|
||||
type: 'hasMany',
|
||||
name: 'executions',
|
||||
target: 'executions',
|
||||
title: '触发执行'
|
||||
}
|
||||
]
|
||||
} as TableOptions;
|
11
packages/plugin-workflow/src/constants.ts
Normal file
11
packages/plugin-workflow/src/constants.ts
Normal file
@ -0,0 +1,11 @@
|
||||
export const EXECUTION_STATUS = {
|
||||
STARTED: 0,
|
||||
RESOLVED: 1,
|
||||
REJECTED: -1
|
||||
};
|
||||
|
||||
export const JOB_STATUS = {
|
||||
PENDING: 0,
|
||||
RESOLVED: 1,
|
||||
REJECTED: -1
|
||||
};
|
@ -0,0 +1,52 @@
|
||||
type Calculator = (...args: any[]) => boolean;
|
||||
|
||||
const calculators = new Map<string, Calculator>();
|
||||
|
||||
export function getCalculator(type: string): Calculator {
|
||||
return calculators.get(type);
|
||||
}
|
||||
|
||||
export function registerCalculator(type: string, fn: Calculator) {
|
||||
calculators.set(type, fn);
|
||||
}
|
||||
|
||||
export function registerCalculators(calculators) {
|
||||
Object.keys(calculators).forEach(key => {
|
||||
registerCalculator(key, calculators[key]);
|
||||
});
|
||||
}
|
||||
|
||||
function equal(a, b) {
|
||||
return a === b;
|
||||
}
|
||||
|
||||
function gt(a, b) {
|
||||
return a > b;
|
||||
}
|
||||
|
||||
function gte(a, b) {
|
||||
return a >= b;
|
||||
}
|
||||
|
||||
function lt(a, b) {
|
||||
return a < b;
|
||||
}
|
||||
|
||||
function lte(a, b) {
|
||||
return a <= b;
|
||||
}
|
||||
|
||||
// TODO: add more common calculators
|
||||
|
||||
registerCalculators({
|
||||
equal,
|
||||
gt,
|
||||
gte,
|
||||
lt,
|
||||
lte,
|
||||
'===': equal,
|
||||
'>': gt,
|
||||
'>=': gte,
|
||||
'<': lt,
|
||||
'<=': lte
|
||||
});
|
@ -0,0 +1,56 @@
|
||||
import { get } from 'lodash';
|
||||
|
||||
import { ModelCtor } from '@nocobase/database';
|
||||
import { ExecutionModel } from '../../models/Execution';
|
||||
|
||||
export type OperandType = 'context' | 'input' | 'job';
|
||||
|
||||
export type ObjectGetterOptions = {
|
||||
path?: string
|
||||
};
|
||||
|
||||
export type JobGetterOptions = ObjectGetterOptions & {
|
||||
id: number
|
||||
};
|
||||
|
||||
export type ConstantOperand = {
|
||||
type?: 'constant';
|
||||
value: any
|
||||
};
|
||||
|
||||
export type ContextOperand = {
|
||||
type: 'context';
|
||||
options: ObjectGetterOptions;
|
||||
};
|
||||
|
||||
export type InputOperand = {
|
||||
type: 'input';
|
||||
options: ObjectGetterOptions;
|
||||
};
|
||||
|
||||
export type JobOperand = {
|
||||
type: 'job';
|
||||
options: JobGetterOptions;
|
||||
};
|
||||
|
||||
export type Operand = ContextOperand | InputOperand | JobOperand | ConstantOperand;
|
||||
|
||||
// TODO: other instructions may also use this method, could be moved to utils.
|
||||
export function getValue(operand: Operand, input: any, execution: ModelCtor<ExecutionModel>) {
|
||||
switch (operand.type) {
|
||||
// from execution context
|
||||
case 'context':
|
||||
return get(execution, operand.options.path);
|
||||
// from input from last job or manual
|
||||
case 'input':
|
||||
return get(input, operand.options.path);
|
||||
// from job in execution
|
||||
case 'job':
|
||||
// assume jobs have been fetched from execution before
|
||||
const job = execution.jobs.find(item => item.id === operand.options.id);
|
||||
return get(job, operand.options.path);
|
||||
// constant
|
||||
default:
|
||||
return operand.value;
|
||||
}
|
||||
}
|
72
packages/plugin-workflow/src/instructions/condition/index.ts
Normal file
72
packages/plugin-workflow/src/instructions/condition/index.ts
Normal file
@ -0,0 +1,72 @@
|
||||
// config: {
|
||||
// not: false,
|
||||
// group: {
|
||||
// type: 'and',
|
||||
// calculations: [
|
||||
// {
|
||||
// calculator: 'time.equal',
|
||||
// operands: [{ type: 'context', options: { path: 'time' } }, { type: 'fn', options: { name: 'newDate', args: [] } }]
|
||||
// },
|
||||
// {
|
||||
// calculator: 'value.equal',
|
||||
// operands: [{ type: 'job.result', options: { id: 213, path: '' } }, { type: 'constant', value: { a: 1 } }]
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
|
||||
import { getValue, Operand } from "./getter";
|
||||
import { getCalculator } from "./calculators";
|
||||
|
||||
type BaseCalculation = {
|
||||
not?: boolean;
|
||||
};
|
||||
|
||||
type SingleCalculation = BaseCalculation & {
|
||||
calculation: string;
|
||||
operands?: Operand[];
|
||||
};
|
||||
|
||||
type GroupCalculationOptions = {
|
||||
type: 'and' | 'or';
|
||||
calculations: Calculation[]
|
||||
};
|
||||
|
||||
type GroupCalculation = BaseCalculation & {
|
||||
group: GroupCalculationOptions
|
||||
};
|
||||
|
||||
// TODO(type)
|
||||
type Calculation = SingleCalculation | GroupCalculation;
|
||||
|
||||
function calculate(config, input, execution) {
|
||||
if (!config) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const { not, group } = config;
|
||||
let result;
|
||||
if (group) {
|
||||
const method = group.type === 'and' ? 'every' : 'some';
|
||||
result = group.calculations[method](calculation => calculate(calculation, input, execution));
|
||||
} else {
|
||||
const args = config.operands.map(operand => getValue(operand, input, execution));
|
||||
const fn = getCalculator(config.calculator);
|
||||
if (!fn) {
|
||||
throw new Error(`no calculator function registered for "${config.calculator}"`);
|
||||
}
|
||||
result = fn(...args);
|
||||
}
|
||||
|
||||
return not ? !result : result;
|
||||
}
|
||||
|
||||
|
||||
export default {
|
||||
manual: false,
|
||||
async run(this, input, execution) {
|
||||
// TODO(optimize): loading of jobs could be reduced and turned into incrementally in execution
|
||||
const jobs = await execution.getJobs();
|
||||
return calculate(this.config as Calculation, input, execution);
|
||||
}
|
||||
}
|
6
packages/plugin-workflow/src/instructions/echo.ts
Normal file
6
packages/plugin-workflow/src/instructions/echo.ts
Normal file
@ -0,0 +1,6 @@
|
||||
export default {
|
||||
manual: false,
|
||||
run(this, input, context) {
|
||||
return input;
|
||||
}
|
||||
};
|
37
packages/plugin-workflow/src/instructions/index.ts
Normal file
37
packages/plugin-workflow/src/instructions/index.ts
Normal file
@ -0,0 +1,37 @@
|
||||
// something like template for type of nodes
|
||||
|
||||
import { ModelCtor, Model } from "@nocobase/database";
|
||||
import { ExecutionModel } from "../models/Execution";
|
||||
|
||||
import echo from './echo';
|
||||
import prompt from './prompt';
|
||||
import condition from './condition';
|
||||
|
||||
// what should a instruction do?
|
||||
// - base on input and context, do any calculations or system call (io), and produce a result or pending.
|
||||
// what should input to be?
|
||||
// - just use previously output result for convenience?
|
||||
// what should context to be?
|
||||
// - could be the workflow execution object (containing context data)
|
||||
export type Instruction = {
|
||||
manual: boolean;
|
||||
run(
|
||||
this: ModelCtor<Model>,
|
||||
input: any,
|
||||
execution: ModelCtor<ExecutionModel>
|
||||
): any
|
||||
}
|
||||
|
||||
const registery = new Map<string, Instruction>();
|
||||
|
||||
export function getInstruction(key: string): Instruction {
|
||||
return registery.get(key);
|
||||
}
|
||||
|
||||
export function registerInstruction(key: string, fn: Instruction) {
|
||||
registery.set(key, fn);
|
||||
}
|
||||
|
||||
registerInstruction('echo', echo);
|
||||
registerInstruction('prompt', prompt);
|
||||
registerInstruction('condition', condition);
|
6
packages/plugin-workflow/src/instructions/prompt.ts
Normal file
6
packages/plugin-workflow/src/instructions/prompt.ts
Normal file
@ -0,0 +1,6 @@
|
||||
export default {
|
||||
manual: true,
|
||||
run(this, input, context) {
|
||||
return input;
|
||||
}
|
||||
}
|
140
packages/plugin-workflow/src/models/Execution.ts
Normal file
140
packages/plugin-workflow/src/models/Execution.ts
Normal file
@ -0,0 +1,140 @@
|
||||
import { Model } from '@nocobase/database';
|
||||
|
||||
import { EXECUTION_STATUS, JOB_STATUS } from '../constants';
|
||||
import { getInstruction } from '../instructions';
|
||||
|
||||
export class ExecutionModel extends Model {
|
||||
async exec(input, previousJob = null, options = {}) {
|
||||
// check execution status for quick out
|
||||
if (this.get('status') !== EXECUTION_STATUS.STARTED) {
|
||||
return;
|
||||
}
|
||||
|
||||
let lastJob = previousJob || await this.getLastJob(options);
|
||||
const node = await this.getNextNode(lastJob);
|
||||
// if not found any node
|
||||
if (!node) {
|
||||
// set execution as resolved
|
||||
await this.update({
|
||||
status: EXECUTION_STATUS.RESOLVED
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// got node.id and node.type
|
||||
// find node instruction by type from registered node types in memory (program defined)
|
||||
const instruction = getInstruction(node.type);
|
||||
|
||||
let result = null;
|
||||
let status = JOB_STATUS.PENDING;
|
||||
// check if manual or node is on current job
|
||||
if (!instruction.manual || (lastJob && lastJob.node_id === node.id)) {
|
||||
// execute instruction of next node and get status
|
||||
try {
|
||||
result = await instruction.run.call(node, input ?? lastJob?.result, this);
|
||||
status = JOB_STATUS.RESOLVED;
|
||||
} catch(err) {
|
||||
result = err;
|
||||
status = JOB_STATUS.REJECTED;
|
||||
}
|
||||
}
|
||||
|
||||
// manually exec pending job
|
||||
if (lastJob && lastJob.node_id === node.id) {
|
||||
if (lastJob.status !== JOB_STATUS.PENDING) {
|
||||
// not allow to retry resolved or rejected job for now
|
||||
// TODO: based on retry config
|
||||
return;
|
||||
}
|
||||
// RUN instruction
|
||||
// should update the record based on input
|
||||
lastJob.update({
|
||||
status,
|
||||
result
|
||||
});
|
||||
} else {
|
||||
// RUN instruction
|
||||
lastJob = await this.createJob({
|
||||
status,
|
||||
node_id: node.id,
|
||||
upstream_id: lastJob ? lastJob.id : null,
|
||||
// TODO: how to presentation error?
|
||||
result
|
||||
});
|
||||
}
|
||||
|
||||
switch(status) {
|
||||
case JOB_STATUS.PENDING:
|
||||
case JOB_STATUS.REJECTED:
|
||||
// TODO: should handle rejected when configured
|
||||
return;
|
||||
default:
|
||||
// should return chained promise to run any nodes as many as possible,
|
||||
// till end (pending/rejected/no more)
|
||||
return this.exec(result, lastJob, options);
|
||||
}
|
||||
}
|
||||
|
||||
async getLastJob(options) {
|
||||
const jobs = await this.getJobs();
|
||||
|
||||
if (!jobs.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// find last job, last means no any other jobs set upstream to
|
||||
const lastJobIds = new Set(jobs.map(item => item.id));
|
||||
jobs.forEach(item => {
|
||||
if (item.upstream_id) {
|
||||
lastJobIds.delete(item.upstream_id);
|
||||
}
|
||||
});
|
||||
// TODO(feature):
|
||||
// if has multiple jobs? which one or some should be run next?
|
||||
// if has determined flowNodeId, run that one.
|
||||
// else not supported for now (multiple race pendings)
|
||||
const [jobId] = Array.from(lastJobIds);
|
||||
return jobs.find(item => item.id === jobId) || null;
|
||||
}
|
||||
|
||||
async getNextNode(lastJob) {
|
||||
if (!this.get('workflow')) {
|
||||
// cache workflow
|
||||
this.setDataValue('workflow', await this.getWorkflow());
|
||||
}
|
||||
const workflow = this.get('workflow');
|
||||
|
||||
// if has not any job, means initial execution
|
||||
if (!lastJob) {
|
||||
// find first node for this workflow
|
||||
// first one is the one has no upstream
|
||||
const [firstNode = null] = await workflow.getNodes({
|
||||
where: {
|
||||
upstream_id: null
|
||||
}
|
||||
});
|
||||
|
||||
// put firstNode as next node to be execute
|
||||
return firstNode;
|
||||
}
|
||||
|
||||
const lastNode = await lastJob.getNode();
|
||||
|
||||
if (lastJob.status === JOB_STATUS.PENDING) {
|
||||
return lastNode;
|
||||
}
|
||||
|
||||
const [nextNode = null] = await workflow.getNodes({
|
||||
where: {
|
||||
upstream_id: lastJob.node_id,
|
||||
// TODO: need better design
|
||||
...(lastNode.type === 'condition' ? {
|
||||
when: lastJob.result
|
||||
} : {})
|
||||
}
|
||||
});
|
||||
|
||||
return nextNode;
|
||||
}
|
||||
}
|
49
packages/plugin-workflow/src/models/Workflow.ts
Normal file
49
packages/plugin-workflow/src/models/Workflow.ts
Normal file
@ -0,0 +1,49 @@
|
||||
import { Model } from '@nocobase/database';
|
||||
|
||||
import { get as getTrigger } from '../triggers';
|
||||
import { EXECUTION_STATUS } from '../constants';
|
||||
|
||||
export class WorkflowModel extends Model {
|
||||
static async mount() {
|
||||
const workflows = await this.findAll({
|
||||
where: { enabled: true }
|
||||
});
|
||||
|
||||
workflows.forEach(workflow => {
|
||||
workflow.mount();
|
||||
});
|
||||
|
||||
this.addHook('afterCreate', (model: WorkflowModel) => model.mount());
|
||||
// TODO: afterUpdate, afterDestroy
|
||||
}
|
||||
|
||||
async mount() {
|
||||
if (!this.get('enabled')) {
|
||||
return;
|
||||
}
|
||||
const type = this.get('type');
|
||||
const config = this.get('config');
|
||||
const trigger = getTrigger(type);
|
||||
trigger.call(this, config, this.start.bind(this));
|
||||
}
|
||||
|
||||
// TODO
|
||||
async unmount() {
|
||||
|
||||
}
|
||||
|
||||
async start(context: Object, options) {
|
||||
// `null` means not to trigger
|
||||
if (context === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
const execution = await this.createExecution({
|
||||
context,
|
||||
status: EXECUTION_STATUS.STARTED
|
||||
});
|
||||
execution.setDataValue('workflow', this);
|
||||
await execution.exec(context, null, options);
|
||||
return execution;
|
||||
}
|
||||
}
|
61
packages/plugin-workflow/src/server.ts
Normal file
61
packages/plugin-workflow/src/server.ts
Normal file
@ -0,0 +1,61 @@
|
||||
import path from 'path';
|
||||
|
||||
import { registerModels } from '@nocobase/database';
|
||||
|
||||
import { WorkflowModel } from './models/Workflow';
|
||||
import { ExecutionModel } from './models/Execution';
|
||||
|
||||
export default {
|
||||
name: 'workflow',
|
||||
async load(options = {}) {
|
||||
const { db } = this.app;
|
||||
|
||||
registerModels({
|
||||
WorkflowModel,
|
||||
ExecutionModel,
|
||||
});
|
||||
|
||||
db.import({
|
||||
directory: path.resolve(__dirname, 'collections'),
|
||||
});
|
||||
|
||||
// [Life Cycle]:
|
||||
// * load all workflows in db
|
||||
// * add all hooks for enabled workflows
|
||||
// * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks
|
||||
this.app.on('beforeStart', async () => {
|
||||
const Workflow = db.getModel('workflows');
|
||||
await Workflow.mount();
|
||||
})
|
||||
|
||||
// [Life Cycle]: initialize all necessary seed data
|
||||
this.app.on('db.init', async () => {
|
||||
|
||||
});
|
||||
|
||||
// const [Automation, AutomationJob] = database.getModels(['automations', 'automations_jobs']);
|
||||
|
||||
// Automation.addHook('afterCreate', async (model: AutomationModel) => {
|
||||
// model.get('enabled') && await model.loadJobs();
|
||||
// });
|
||||
|
||||
// Automation.addHook('afterUpdate', async (model: AutomationModel) => {
|
||||
// if (!model.changed('enabled' as any)) {
|
||||
// return;
|
||||
// }
|
||||
// model.get('enabled') ? await model.loadJobs() : await model.cancelJobs();
|
||||
// });
|
||||
|
||||
// Automation.addHook('beforeDestroy', async (model: AutomationModel) => {
|
||||
// await model.cancelJobs();
|
||||
// });
|
||||
|
||||
// AutomationJob.addHook('afterCreate', async (model: AutomationJobModel) => {
|
||||
// await model.bootstrap();
|
||||
// });
|
||||
|
||||
// AutomationJob.addHook('beforeDestroy', async (model: AutomationJobModel) => {
|
||||
// await model.cancel();
|
||||
// });
|
||||
}
|
||||
}
|
10
packages/plugin-workflow/src/triggers/data-change.ts
Normal file
10
packages/plugin-workflow/src/triggers/data-change.ts
Normal file
@ -0,0 +1,10 @@
|
||||
export interface IDataChangeTriggerConfig {
|
||||
collection: string;
|
||||
// TODO: ICondition
|
||||
filter: any;
|
||||
}
|
||||
|
||||
export function afterCreate(config: IDataChangeTriggerConfig, callback: Function) {
|
||||
const Model = this.database.getModel(config.collection);
|
||||
Model.addHook('afterCreate', `workflow-${this.get('id')}`, (data: typeof Model, options) => callback({ data }, options));
|
||||
}
|
21
packages/plugin-workflow/src/triggers/index.ts
Normal file
21
packages/plugin-workflow/src/triggers/index.ts
Normal file
@ -0,0 +1,21 @@
|
||||
import * as dataChangeTriggers from './data-change';
|
||||
|
||||
export interface ITrigger {
|
||||
(config: any): void
|
||||
}
|
||||
|
||||
const triggers = new Map<string, ITrigger>();
|
||||
|
||||
export function register(type: string, trigger: ITrigger): void {
|
||||
triggers.set(type, trigger);
|
||||
}
|
||||
|
||||
export function get(type: string): ITrigger | undefined {
|
||||
return triggers.get(type);
|
||||
}
|
||||
|
||||
for (const key in dataChangeTriggers) {
|
||||
if (dataChangeTriggers.hasOwnProperty(key)) {
|
||||
register(key, dataChangeTriggers[key]);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user