feat(plugin-workflow): add assignees config for prompt instruction (#690)

This commit is contained in:
Junyi 2022-07-27 18:23:16 +08:00 committed by GitHub
parent 12f658391a
commit a73c37c927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1063 additions and 41 deletions

View File

@ -8,6 +8,8 @@ import initActions from './actions';
import initTriggers, { Trigger } from './triggers';
import initInstructions, { Instruction } from './instructions';
import Processor from './Processor';
import calculators from './calculators';
import extensions from './extensions';
import WorkflowModel from './models/Workflow';
import ExecutionModel from './models/Execution';
import { EXECUTION_STATUS } from './constants';
@ -17,6 +19,8 @@ import { EXECUTION_STATUS } from './constants';
export default class WorkflowPlugin extends Plugin {
instructions: Registry<Instruction> = new Registry();
triggers: Registry<Trigger> = new Registry();
calculators = calculators;
extensions = extensions;
onBeforeSave = async (instance: WorkflowModel, options) => {
const Model = <typeof WorkflowModel>instance.constructor;
@ -80,6 +84,8 @@ export default class WorkflowPlugin extends Plugin {
db.on('workflows.afterSave', (model: WorkflowModel) => this.toggle(model));
db.on('workflows.afterDestroy', (model: WorkflowModel) => this.toggle(model, false));
this.app.on('afterLoadAll', async () => this.extensions.reduce((promise, extend) => promise.then(() => extend(this)), Promise.resolve()));
// [Life Cycle]:
// * load all workflows in db
// * add all hooks for enabled workflows
@ -122,10 +128,10 @@ export default class WorkflowPlugin extends Plugin {
}
}
async trigger(workflow, context: Object, options: Transactionable = {}) {
async trigger(workflow: WorkflowModel, context: Object, options: Transactionable = {}): Promise<ExecutionModel | null> {
// `null` means not to trigger
if (context === null) {
return;
return null;
}
let transaction = null;
@ -145,7 +151,7 @@ export default class WorkflowPlugin extends Plugin {
if (existed) {
console.warn(`workflow ${workflow.id} has already been triggered in same execution (${transaction.id}), and newly triggering will be skipped.`);
return;
return null;
}
}

View File

@ -23,7 +23,7 @@ export default class Processor {
[JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED,
[JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED,
[JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED,
[JOB_STATUS.CANCELLED]: EXECUTION_STATUS.CANCELLED,
[JOB_STATUS.CANCELED]: EXECUTION_STATUS.CANCELED,
};
transaction: Transaction;
@ -168,8 +168,6 @@ export default class Processor {
}
let savedJob;
// TODO(optimize): many checking of resuming or new could be improved
// could be implemented separately in exec() / resume()
if (job instanceof Model) {
savedJob = (await job.save({ transaction: this.transaction })) as unknown as JobModel;
} else {

View File

@ -0,0 +1,573 @@
import { MockServer } from '@nocobase/test';
import Database from '@nocobase/database';
import UserPlugin from '@nocobase/plugin-users';
import { getApp, sleep } from '..';
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
describe('workflow > instructions > prompt', () => {
describe('base', () => {
let app: MockServer;
let agent;
let db: Database;
let PostRepo;
let WorkflowModel;
let workflow;
beforeEach(async () => {
app = await getApp();
agent = app.agent();
db = app.db;
WorkflowModel = db.getCollection('workflows').model;
PostRepo = db.getCollection('posts').repository;
workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts'
}
});
});
afterEach(() => db.close());
it('resume to resolve', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
actions: {
[JOB_STATUS.RESOLVED]: 'submit'
}
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const [pending] = await workflow.getExecutions();
expect(pending.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await pending.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
const { status } = await agent.resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 1 }
}
});
expect(status).toBe(202);
// NOTE: wait for no await execution
await sleep(500);
const [resolved] = await workflow.getExecutions();
expect(resolved.status).toBe(EXECUTION_STATUS.RESOLVED);
const [j2] = await resolved.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
expect(j2.result).toEqual({ a: 1 });
});
});
describe('assignees', () => {
let app: MockServer;
let agent;
let userAgents;
let db: Database;
let PostRepo;
let WorkflowModel;
let workflow;
let UserModel;
let users;
let UserJobModel;
beforeEach(async () => {
app = await getApp({
plugins: [
'@nocobase/plugin-users'
]
});
agent = app.agent();
db = app.db;
WorkflowModel = db.getCollection('workflows').model;
PostRepo = db.getCollection('posts').repository;
UserModel = db.getCollection('users').model;
UserJobModel = db.getModel('users_jobs');
users = await UserModel.bulkCreate([
{ id: 1, nickname: 'a' },
{ id: 2, nickname: 'b' }
]);
const userPlugin = app.getPlugin('@nocobase/plugin-users') as UserPlugin;
userAgents = users.map((user) => app.agent().auth(userPlugin.jwtService.sign({
userId: user.id,
}), { type: 'bearer' }));
workflow = await WorkflowModel.create({
enabled: true,
type: 'collection',
config: {
mode: 1,
collection: 'posts'
}
});
});
afterEach(() => db.close());
describe('mode: 0 (single record)', () => {
it('the only user assigned could submit', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id]
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const [pending] = await workflow.getExecutions();
expect(pending.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await pending.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
const usersJobs = await UserJobModel.findAll();
expect(usersJobs.length).toBe(1);
expect(usersJobs[0].status).toBe(JOB_STATUS.PENDING);
expect(usersJobs[0].userId).toBe(users[0].id);
expect(usersJobs[0].jobId).toBe(j1.id);
const res1 = await agent.resource('jobs').submit({
filterByTk: j1.id
});
expect(res1.status).toBe(401);
const res2 = await userAgents[1].resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED
}
});
expect(res2.status).toBe(404);
const res3 = await userAgents[0].resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 1 }
}
});
expect(res3.status).toBe(202);
await sleep(500);
const [j2] = await pending.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
expect(j2.result).toEqual({ a: 1 });
const usersJobsAfter = await UserJobModel.findAll();
expect(usersJobsAfter.length).toBe(1);
expect(usersJobsAfter[0].status).toBe(JOB_STATUS.RESOLVED);
expect(usersJobsAfter[0].result).toEqual({ a: 1 });
const res4 = await userAgents[0].resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 2 }
}
});
expect(res4.status).toBe(400);
});
it('any user assigned could submit', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id]
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const [pending] = await workflow.getExecutions();
expect(pending.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await pending.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
const res1 = await userAgents[1].resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 1 }
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [j2] = await pending.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
expect(j2.result).toEqual({ a: 1 });
const res2 = await userAgents[0].resource('jobs').submit({
filterByTk: j1.id,
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 2 }
}
});
expect(res2.status).toBe(400);
});
it('also could submit to users_jobs api', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id]
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const usersJobs = await UserJobModel.findAll();
expect(usersJobs.length).toBe(1);
expect(usersJobs[0].get('status')).toBe(JOB_STATUS.PENDING);
expect(usersJobs[0].get('userId')).toBe(users[0].id);
const res = await userAgents[0].resource('users_jobs').submit({
filterByTk: usersJobs[0].get('id'),
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 1 }
}
});
expect(res.status).toBe(202);
await sleep(500);
const [execution] = await workflow.getExecutions();
expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const [job] = await execution.getJobs();
expect(job.status).toBe(JOB_STATUS.RESOLVED);
expect(job.result).toEqual({ a: 1 });
});
});
describe('mode: 1 (multiple record, all)', () => {
it('all resolved', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: 1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 1 }
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
expect(j1.result).toBe(0.5);
const usersJobs1 = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(usersJobs1.length).toBe(2);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.RESOLVED,
result: { a: 2 }
}
});
expect(res2.status).toBe(202);
await sleep(500);
const [e2] = await workflow.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.RESOLVED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
expect(j2.result).toBe(1);
});
it('first rejected', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: 1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.REJECTED
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.REJECTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.REJECTED);
expect(j1.result).toBe(0.5);
const usersJobs1 = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(usersJobs1.length).toBe(2);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.REJECTED,
result: { a: 2 }
}
});
expect(res2.status).toBe(400);
});
it('last rejected', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: 1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.RESOLVED
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
expect(j1.result).toBe(0.5);
const usersJobs1 = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(usersJobs1.length).toBe(2);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.REJECTED,
result: { a: 2 }
}
});
expect(res2.status).toBe(202);
await sleep(500);
const [e2] = await workflow.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.REJECTED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.REJECTED);
expect(j2.result).toBe(1);
});
});
describe('mode: -1 (multiple record, any)', () => {
it('first resolved', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: -1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.RESOLVED
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.RESOLVED);
expect(j1.result).toBe(0.5);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.REJECTED
}
});
expect(res2.status).toBe(400);
});
it('any resolved', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: -1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.REJECTED
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
expect(j1.result).toBe(0.5);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.RESOLVED
}
});
expect(res2.status).toBe(202);
await sleep(500);
const [e2] = await workflow.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.RESOLVED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
expect(j2.result).toBe(1);
});
it('all rejected', async () => {
const n1 = await workflow.createNode({
type: 'prompt',
config: {
assignees: [users[0].id, users[1].id],
mode: -1
}
});
const post = await PostRepo.create({ values: { title: 't1' } });
const UserJobModel = db.getModel('users_jobs');
const pendingJobs = await UserJobModel.findAll({
order: [[ 'userId', 'ASC' ]]
});
expect(pendingJobs.length).toBe(2);
const res1 = await userAgents[0].resource('users_jobs').submit({
filterByTk: pendingJobs[0].get('id'),
values: {
status: JOB_STATUS.REJECTED
}
});
expect(res1.status).toBe(202);
await sleep(500);
const [e1] = await workflow.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.STARTED);
const [j1] = await e1.getJobs();
expect(j1.status).toBe(JOB_STATUS.PENDING);
expect(j1.result).toBe(0.5);
const res2 = await userAgents[1].resource('users_jobs').submit({
filterByTk: pendingJobs[1].get('id'),
values: {
status: JOB_STATUS.REJECTED
}
});
expect(res2.status).toBe(202);
await sleep(500);
const [e2] = await workflow.getExecutions();
expect(e2.status).toBe(EXECUTION_STATUS.REJECTED);
const [j2] = await e2.getJobs();
expect(j2.status).toBe(JOB_STATUS.REJECTED);
expect(j2.result).toBe(1);
});
});
describe('mode: (0,1) (multiple record, all to percent)', () => {
});
describe('mode: (-1,0) (multiple record, any to percent)', () => {
});
});
});

View File

@ -1,5 +1,6 @@
import * as workflows from './workflows';
import * as nodes from './nodes';
import * as jobs from './jobs';
function make(name, mod) {
return Object.keys(mod).reduce((result, key) => ({
@ -17,6 +18,7 @@ export default function({ app }) {
}),
...make('flow_nodes', {
update: nodes.update
})
}),
...make('jobs', jobs)
});
}

View File

@ -0,0 +1,28 @@
import { Context } from '@nocobase/actions';
import { JOB_STATUS } from '../constants';
export async function submit(context: Context, next) {
const { values } = context.action.params;
const { body: instance } = context;
// NOTE: validate status
if (instance.status !== JOB_STATUS.PENDING) {
return context.throw(400);
}
// NOTE: validate assignee
instance.set({
status: values.status,
result: values.result
});
context.status = 202;
await next();
const plugin = context.app.pm.get('@nocobase/plugin-workflow');
const processor = plugin.createProcessor(instance.execution);
// NOTE: resume the process and no `await` for quick returning
processor.resume(instance);
}

View File

@ -33,7 +33,7 @@ function migrateConfig(config, oldToNew) {
case 'string':
return value
.replace(
/(\{\{\$jobsMapByNodeId\.)(\d+)/,
/(\{\{\$jobsMapByNodeId\.)([\w-]+)/,
(_, prefix, id) => `${prefix}${oldToNew.get(Number.parseInt(id, 10)).id}`
);
default:

View File

@ -1,4 +1,4 @@
import { get as getWithPath } from 'lodash';
import { toNumber, get as getWithPath } from 'lodash';
import { Registry } from "@nocobase/utils";
import JobModel from '../models/Job';
@ -141,23 +141,23 @@ calculators.register('<=', lte);
function add(...args) {
return args.reduce((sum, a) => sum + a, 0);
return args.reduce((sum, a) => sum + toNumber(a), 0);
}
function minus(a, b) {
return a - b;
return toNumber(a) - toNumber(b);
}
function multiple(...args) {
return args.reduce((result, a) => result * a, 1);
return args.reduce((result, a) => result * toNumber(a), 1);
}
function divide(a, b) {
return a / b;
return toNumber(a) / toNumber(b);
}
function mod(a, b) {
return a % b;
return toNumber(a) % toNumber(b);
}
calculators.register('add', add);

View File

@ -2,14 +2,14 @@ export const EXECUTION_STATUS = {
STARTED: 0,
RESOLVED: 1,
REJECTED: -1,
CANCELLED: -2
CANCELED: -2
};
export const JOB_STATUS = {
PENDING: 0,
RESOLVED: 1,
REJECTED: -1,
CANCELLED: -2
CANCELED: -2
};
export const BRANCH_INDEX = {

View File

@ -0,0 +1,58 @@
import { Context, utils } from '@nocobase/actions';
import { EXECUTION_STATUS, JOB_STATUS } from '../../constants';
export async function submit(context: Context, next) {
const repository = utils.getRepositoryFromParams(context);
const { filterByTk, values } = context.action.params;
const { currentUser } = context.state;
if (!currentUser) {
return context.throw(401);
}
const instance = await repository.findOne({
filterByTk,
// filter: {
// userId: currentUser?.id
// },
appends: ['job', 'node', 'execution'],
context
});
const { actions, assignees } = instance.node.config;
// NOTE: validate status
if (instance.status !== JOB_STATUS.PENDING
|| instance.job.status !== JOB_STATUS.PENDING
|| instance.execution.status !== EXECUTION_STATUS.STARTED
|| (actions && !actions[values.status])
) {
context.throw(400);
}
if (!assignees.includes(currentUser.id)
|| instance.userId !== currentUser.id
) {
return context.throw(404);
}
// NOTE: validate assignee
await instance.update({
status: values.status,
result: values.result
});
context.body = instance;
context.status = 202;
await next();
instance.job.latestUserJob = instance;
const plugin = context.app.pm.get('@nocobase/plugin-workflow');
const processor = plugin.createProcessor(instance.execution);
// NOTE: resume the process and no `await` for quick returning
processor.resume(instance.job);
}

View File

@ -0,0 +1,18 @@
import { extend } from '@nocobase/database';
export default extend({
name: 'jobs',
fields: [
{
type: 'belongsToMany',
name: 'users',
through: 'users_jobs',
},
{
type: 'hasMany',
name: 'usersJobs',
target: 'users_jobs',
foreignKey: 'jobId'
}
]
});

View File

@ -0,0 +1,17 @@
import { extend } from '@nocobase/database';
export default extend({
name: 'users',
fields: [
{
type: 'belongsToMany',
name: 'jobs',
through: 'users_jobs',
},
{
type: 'hasMany',
name: 'usersJobs',
target: 'users_jobs'
}
]
});

View File

@ -0,0 +1,42 @@
import { CollectionOptions } from '@nocobase/database';
export default {
name: 'users_jobs',
fields: [
{
type: 'integer',
name: 'id',
primaryKey: true,
autoIncrement: true
},
{
type: 'belongsTo',
name: 'job'
},
{
type: 'belongsTo',
name: 'user'
},
{
type: 'belongsTo',
name: 'execution'
},
{
type: 'belongsTo',
name: 'node',
target: 'flow_nodes'
},
{
type: 'belongsTo',
name: 'workflow'
},
{
type: 'integer',
name: 'status'
},
{
type: 'jsonb',
name: 'result'
}
]
} as CollectionOptions;

View File

@ -0,0 +1,210 @@
import path from 'path';
import { requireModule } from '@nocobase/utils';
import { Context } from '@nocobase/actions';
import Plugin from '../../Plugin';
import Prompt, { PromptConfig } from '../../instructions/prompt';
import { submit } from './actions';
import { JOB_STATUS } from '../../constants';
interface AssignedPromptConfig extends PromptConfig {
assignees?: number[];
mode?: number;
}
// NOTE: for single record mode (mode: 0/null)
async function middleware(context: Context, next) {
const { body: job, state, action } = context;
const { assignees, mode } = job.node.config as AssignedPromptConfig;
// NOTE: skip to no user implementation
if (!assignees) {
return next();
}
if (!state.currentUser) {
return context.throw(401);
}
if (!assignees.includes(state.currentUser.id)) {
return context.throw(404);
}
// NOTE: multiple record mode could not use jobs:submit action
// should use users_jobs:submit/:id instead
if (mode) {
return context.throw(400);
}
await next();
const data = {
userId: context.state.currentUser.id,
jobId: job.id,
nodeId: job.nodeId,
executionId: job.executionId,
workflowId: job.execution.workflowId,
status: job.status,
result: job.result
};
// NOTE: update users job after main job is done
const UserJobModel = context.db.getModel('users_jobs');
let userJob = await UserJobModel.findOne({
where: {
userId: context.state.currentUser.id,
jobId: job.id,
}
});
if (userJob) {
await userJob.update(data);
} else {
userJob = await UserJobModel.create(data);
}
}
async function run(node, prevJob, processor) {
const { assignees, mode } = node.config as AssignedPromptConfig;
if (!assignees) {
const { plugin } = processor.options;
const origin = plugin.instructions.get('prompt') as Prompt;
return origin.constructor.prototype.run.call(this, node, prevJob, processor);
}
const job = await processor.saveJob({
status: JOB_STATUS.PENDING,
result: mode ? [] : null,
nodeId: node.id,
upstreamId: prevJob?.id ?? null
});
// NOTE: batch create users jobs
const UserJobModel = processor.options.plugin.db.getModel('users_jobs');
await UserJobModel.bulkCreate(assignees.map(userId => ({
userId,
jobId: job.id,
nodeId: node.id,
executionId: job.executionId,
workflowId: node.workflowId,
status: JOB_STATUS.PENDING
})), {
transaction: processor.transaction
});
return job;
}
const PROMPT_ASSIGNED_MODE = {
SINGLE: Symbol('single'),
ALL: Symbol('all'),
ANY: Symbol('any'),
ALL_PERCENTAGE: Symbol('all percentage'),
ANY_PERCENTAGE: Symbol('any percentage')
};
const Modes = {
[PROMPT_ASSIGNED_MODE.SINGLE]: {
getStatus(distribution, assignees) {
const done = distribution.find(item => item.status !== JOB_STATUS.PENDING && item.count > 0);
return done ? done.status : null
}
},
[PROMPT_ASSIGNED_MODE.ALL]: {
getStatus(distribution, assignees) {
const resolved = distribution.find(item => item.status === JOB_STATUS.RESOLVED);
if (resolved && resolved.count === assignees.length) {
return JOB_STATUS.RESOLVED;
}
// NOTE: `rejected` or `canceled`
const failed = distribution.find(item => item.status < JOB_STATUS.PENDING);
if (failed && failed.count) {
return failed.status;
}
return null;
}
},
[PROMPT_ASSIGNED_MODE.ANY]: {
getStatus(distribution, assignees) {
const resolved = distribution.find(item => item.status === JOB_STATUS.RESOLVED);
if (resolved && resolved.count) {
return JOB_STATUS.RESOLVED;
}
const failedCount = distribution.reduce((count, item) => item.status < JOB_STATUS.PENDING ? count + item.count : count, 0);
// NOTE: all failures are considered as rejected for now
if (failedCount === assignees.length) {
return JOB_STATUS.REJECTED;
}
return null;
}
}
};
function getMode(mode) {
switch (true) {
case mode === 1:
return Modes[PROMPT_ASSIGNED_MODE.ALL];
case mode === -1:
return Modes[PROMPT_ASSIGNED_MODE.ANY];
case mode > 0:
return Modes[PROMPT_ASSIGNED_MODE.ALL_PERCENTAGE];
case mode < 0:
return Modes[PROMPT_ASSIGNED_MODE.ANY_PERCENTAGE];
default:
return Modes[PROMPT_ASSIGNED_MODE.SINGLE];
}
}
async function resume(node, job, processor) {
// NOTE: check all users jobs related if all done then continue as parallel
const { assignees, mode } = node.config as AssignedPromptConfig;
if (!assignees) {
const { plugin } = processor.options;
const origin = plugin.instructions.get('prompt') as Prompt;
return origin.constructor.prototype.resume.call(this, node, job, processor);
}
const UserJobModel = processor.options.plugin.db.getModel('users_jobs');
const distribution = await UserJobModel.count({
where: {
jobId: job.id
},
group: ['status']
});
const submitted = distribution.reduce((count, item) => item.status !== JOB_STATUS.PENDING ? count + item.count : count, 0);
const result = mode ? (submitted || 0) / assignees.length : job.latestUserJob?.result ?? job.result;
job.set({
status: getMode(mode).getStatus(distribution, assignees) ?? JOB_STATUS.PENDING,
result
});
return job;
}
export default async function(plugin: Plugin) {
const instruction = plugin.instructions.get('prompt') as Prompt;
instruction.extend({
run,
resume
});
instruction.use(middleware);
// TODO(bug): through table should be load first because primary
// await plugin.db.import({
// directory: path.join(__dirname, './collections')
// });
plugin.db.collection(requireModule(path.join(__dirname, './collections/users_jobs')));
plugin.db.collection(requireModule(path.join(__dirname, './collections/users')));
plugin.db.collection(requireModule(path.join(__dirname, './collections/jobs')));
plugin.app.actions({
'users_jobs:submit': submit
});
}

View File

@ -0,0 +1,5 @@
import assignees from './assignees';
export default [
assignees
];

View File

@ -3,6 +3,7 @@ import { EXECUTION_STATUS, JOB_STATUS } from "../constants";
import ExecutionModel from '../models/Execution';
import JobModel from '../models/Job';
import Processor from '../Processor';
import { Instruction } from '.';
type ValueOf<T> = T[keyof T];
@ -11,12 +12,12 @@ interface DelayConfig {
duration: number;
}
export default class {
export default class implements Instruction {
timers: Map<number, NodeJS.Timeout> = new Map();
constructor(protected plugin: Plugin) {
plugin.app.on('beforeStart', () => this.load());
plugin.app.on('beforeStop', () => this.unload())
plugin.app.on('beforeStop', () => this.unload());
}
async load() {

View File

@ -7,15 +7,6 @@ import FlowNodeModel from '../models/FlowNode';
import Plugin from '..';
import Processor from '../Processor';
import prompt from './prompt';
import calculation from './calculation';
import condition from './condition';
import parallel from './parallel';
import query from './query';
import create from './create';
import update from './update';
import destroy from './destroy';
export type Job = {
status: number;
result?: unknown;
@ -51,18 +42,23 @@ export default function<T extends Instruction>(
) {
const { instructions } = plugin;
instructions.register('prompt', prompt);
instructions.register('calculation', calculation);
instructions.register('condition', condition);
instructions.register('parallel', parallel);
instructions.register('query', query);
instructions.register('create', create);
instructions.register('update', update);
instructions.register('destroy', destroy);
const natives = [
'calculation',
'condition',
'parallel',
'delay',
'prompt',
'query',
'create',
'update',
'destroy'
].reduce((result, key) => Object.assign(result, { [key]: key }), {});
instructions.register('delay', new (requireModule(path.join(__dirname, 'delay')))(plugin));
for (const [name, value] of Object.entries({ ...more, ...natives })) {
const instruction = typeof value === 'string'
? requireModule(path.isAbsolute(value) ? value : path.join(__dirname, value))
: value;
for (const [name, instruction] of Object.entries(more)) {
instructions.register(name, typeof instruction === 'function' ? new instruction(plugin) : instruction);
}
}

View File

@ -1,14 +1,82 @@
import { JOB_STATUS } from "../constants";
import compose from 'koa-compose';
import { Context, utils } from '@nocobase/actions';
import Plugin from '..';
import { JOB_STATUS } from "../constants";
import { Instruction } from '.';
export interface PromptConfig {
fields: [];
actions;
}
async function loadJob(context: Context, next) {
const { filterByTk, values } = context.action.params;
if (!context.body) {
const jobRepo = utils.getRepositoryFromParams(context);
const job = await jobRepo.findOne({
filterByTk,
appends: ['node', 'execution'],
context
});
if (!filterByTk || !job) {
return context.throw(404);
}
// cache
context.body = job;
}
const { type, config } = context.body.node;
if (type === 'prompt'
&& config.actions
&& !config.actions[values.status]) {
return context.throw(400);
}
await next();
}
export default class implements Instruction {
middlewares = [];
constructor(protected plugin: Plugin) {
plugin.app.resourcer.use(this.middleware);
}
middleware = async (context: Context, next) => {
const { actionName, resourceName } = context.action;
if (actionName === 'submit'
&& resourceName === 'jobs'
&& this.middlewares.length
) {
return compose([loadJob, ...this.middlewares])(context, next);
}
await next();
};
use(middleware) {
this.middlewares.push(middleware);
}
export default {
run(node, input, processor) {
return {
status: JOB_STATUS.PENDING
};
},
}
resume(node, job, processor) {
if (!node.config.actions) {
job.set('status', JOB_STATUS.RESOLVED);
}
return job;
}
extend(options: Instruction) {
Object.assign(this, options);
}
};