refactor(plugin-workflow): add transaction check from data source (#3857)

This commit is contained in:
Junyi 2024-03-29 09:09:12 +08:00 committed by GitHub
parent 29af40f504
commit 1f59d4bed2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 35 additions and 17 deletions

View File

@ -35,6 +35,7 @@ export default class PluginWorkflowServer extends Plugin {
instructions: Registry<InstructionInterface> = new Registry();
triggers: Registry<Trigger> = new Registry();
functions: Registry<CustomFunction> = new Registry();
enabledCache: Map<number, WorkflowModel> = new Map();
private ready = false;
private executing: Promise<void> | null = null;
@ -72,7 +73,7 @@ export default class PluginWorkflowServer extends Plugin {
return trigger.sync ?? workflow.sync;
}
onBeforeSave = async (instance: WorkflowModel, options) => {
private onBeforeSave = async (instance: WorkflowModel, options) => {
const Model = <typeof WorkflowModel>instance.constructor;
if (instance.enabled) {
@ -273,8 +274,10 @@ export default class PluginWorkflowServer extends Plugin {
trigger.off({ ...workflow.get(), ...prev });
}
trigger.on(workflow);
this.enabledCache.set(workflow.id, workflow);
} else {
trigger.off(workflow);
this.enabledCache.delete(workflow.id);
}
}
@ -304,9 +307,7 @@ export default class PluginWorkflowServer extends Plugin {
this.eventsCount = this.events.length;
logger.info(`new event triggered, now events: ${this.events.length}`);
logger.debug(`event data:`, {
data: context,
});
logger.debug(`event data:`, { context });
if (this.events.length > 1) {
return;
@ -488,23 +489,39 @@ export default class PluginWorkflowServer extends Plugin {
if (execution.status === EXECUTION_STATUS.QUEUEING) {
await execution.update({ status: EXECUTION_STATUS.STARTED }, { transaction: options.transaction });
}
const logger = this.getLogger(execution.workflowId);
const processor = this.createProcessor(execution, options);
this.getLogger(execution.workflowId).info(`execution (${execution.id}) ${job ? 'resuming' : 'starting'}...`);
logger.info(`execution (${execution.id}) ${job ? 'resuming' : 'starting'}...`);
// this.emit('beforeProcess', processor);
try {
await (job ? processor.resume(job) : processor.start());
this.getLogger(execution.workflowId).info(
`execution (${execution.id}) finished with status: ${execution.status}`,
);
logger.info(`execution (${execution.id}) finished with status: ${execution.status}`, { execution });
if (execution.status && execution.workflow.options?.deleteExecutionOnStatus?.includes(execution.status)) {
await execution.destroy();
}
} catch (err) {
this.getLogger(execution.workflowId).error(`execution (${execution.id}) error: ${err.message}`, err);
logger.error(`execution (${execution.id}) error: ${err.message}`, err);
}
// this.emit('afterProcess', processor);
return processor;
}
useDataSourceTransaction(dataSourceName = 'main', transaction, create = false) {
// @ts-ignore
const { db } = this.app.dataSourceManager.dataSources.get(dataSourceName).collectionManager;
if (!db) {
return;
}
if (db.sequelize === transaction?.sequelize) {
return transaction;
}
if (create) {
return db.sequelize.transaction();
}
}
}

View File

@ -12,7 +12,7 @@ export default {
name: 'workflow',
},
{
type: 'uid',
type: 'string',
name: 'key',
},
{

View File

@ -15,13 +15,14 @@ export class CreateInstruction extends Instruction {
.get(dataSourceName)
.collectionManager.getCollection(collectionName);
const options = processor.getParsedValue(params, node.id);
const transaction = this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction);
const created = await repository.create({
...options,
context: {
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
transaction,
});
let result = created;
@ -34,7 +35,7 @@ export class CreateInstruction extends Instruction {
result = await repository.findOne({
filterByTk: created[model.primaryKeyAttribute],
appends: Array.from(includeFields),
transaction: processor.transaction,
transaction,
});
}

View File

@ -20,7 +20,7 @@ export class DestroyInstruction extends Instruction {
context: {
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction),
});
return {

View File

@ -38,7 +38,7 @@ export class QueryInstruction extends Instruction {
.filter((item) => item.field)
.map((item) => `${item.direction?.toLowerCase() === 'desc' ? '-' : ''}${item.field}`),
appends,
transaction: processor.transaction,
transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction),
});
if (failOnEmpty && (multiple ? !result.length : !result)) {

View File

@ -20,7 +20,7 @@ export class UpdateInstruction extends Instruction {
context: {
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction: processor.transaction,
transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction),
});
return {

View File

@ -96,7 +96,7 @@ async function handler(this: CollectionTrigger, workflow: WorkflowModel, data: M
workflow,
{ data: json, stack: context?.stack },
{
transaction,
transaction: this.workflow.useDataSourceTransaction(dataSourceName, transaction),
},
);
} else {