Workflow Definitions
Define and structure workflow definitions in SPANE
Workflow Definitions
Workflows in SPANE are defined as Directed Acyclic Graphs (DAGs) where nodes represent processing steps and edges represent data dependencies.
Basic Workflow Structure
import type { WorkflowDefinition } from 'spane';
const workflow: WorkflowDefinition = {
id: 'my-workflow',
name: 'My Workflow',
entryNodeId: 'start',
nodes: [
{
id: 'start',
type: 'transform',
config: {},
inputs: [],
outputs: ['process']
},
{
id: 'process',
type: 'http',
config: { url: 'https://api.example.com' },
inputs: ['start'],
outputs: ['end']
},
{
id: 'end',
type: 'email',
config: { to: 'user@example.com' },
inputs: ['process'],
outputs: []
}
]
};Workflow Properties
Core Properties
| Property | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique identifier for the workflow |
name | string | Yes | Human-readable name |
entryNodeId | string | Yes | ID of the starting node |
nodes | NodeDefinition[] | Yes | Array of node definitions |
Optional Properties
| Property | Type | Default | Description |
|---|---|---|---|
triggers | WorkflowTrigger[] | undefined | Webhook or schedule triggers |
maxConcurrency | number | undefined | Max concurrent workflow executions |
concurrencyLockTTL | number | undefined | TTL for concurrency lock (seconds) |
priority | number | 5 | Job priority (1-10, higher = more important) |
delay | number | 0 | Delay before execution (milliseconds) |
jobId | string | undefined | Custom job ID for deduplication |
Node Definition
Each node in the workflow is defined with these properties:
interface NodeDefinition {
id: string; // Unique node ID within the workflow
type: string; // Node executor type (registered in NodeRegistry)
config: Record<string, any>; // Node-specific configuration
inputs: string[]; // IDs of parent nodes
outputs: string[]; // IDs of child nodes
}Node Example
{
id: 'fetch-data',
type: 'http',
config: {
url: 'https://api.example.com/users',
method: 'GET',
headers: {
'Authorization': 'Bearer token123'
}
},
inputs: ['authenticate'],
outputs: ['transform', 'validate']
}Workflow Patterns
Linear Workflow
Simple sequential execution:
const linearWorkflow: WorkflowDefinition = {
id: 'linear-workflow',
name: 'Linear Workflow',
entryNodeId: 'step1',
nodes: [
{
id: 'step1',
type: 'transform',
config: {},
inputs: [],
outputs: ['step2']
},
{
id: 'step2',
type: 'transform',
config: {},
inputs: ['step1'],
outputs: ['step3']
},
{
id: 'step3',
type: 'email',
config: { to: 'admin@example.com' },
inputs: ['step2'],
outputs: []
}
]
};Parallel Workflow
Execute multiple branches concurrently:
const parallelWorkflow: WorkflowDefinition = {
id: 'parallel-workflow',
name: 'Parallel Workflow',
entryNodeId: 'split',
nodes: [
{
id: 'split',
type: 'transform',
config: {},
inputs: [],
outputs: ['branch1', 'branch2', 'branch3']
},
{
id: 'branch1',
type: 'http',
config: { url: 'https://api1.example.com' },
inputs: ['split'],
outputs: ['merge']
},
{
id: 'branch2',
type: 'http',
config: { url: 'https://api2.example.com' },
inputs: ['split'],
outputs: ['merge']
},
{
id: 'branch3',
type: 'http',
config: { url: 'https://api3.example.com' },
inputs: ['split'],
outputs: ['merge']
},
{
id: 'merge',
type: 'transform',
config: {},
inputs: ['branch1', 'branch2', 'branch3'],
outputs: []
}
]
};Conditional Branching
Control flow based on execution results:
const conditionalWorkflow: WorkflowDefinition = {
id: 'conditional-workflow',
name: 'Conditional Workflow',
entryNodeId: 'evaluate',
nodes: [
{
id: 'evaluate',
type: 'router',
config: {},
inputs: [],
outputs: ['process-a', 'process-b'] // Will use nextNodes in result
},
{
id: 'process-a',
type: 'http',
config: { url: 'https://api-a.example.com' },
inputs: ['evaluate'],
outputs: ['end']
},
{
id: 'process-b',
type: 'http',
config: { url: 'https://api-b.example.com' },
inputs: ['evaluate'],
outputs: ['end']
},
{
id: 'end',
type: 'email',
config: {},
inputs: ['process-a', 'process-b'],
outputs: []
}
]
};
// In your router executor:
class RouterExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { condition } = context.inputData;
if (condition === 'a') {
return {
success: true,
data: { route: 'a' },
nextNodes: ['process-a'] // Only execute branch A
};
} else {
return {
success: true,
data: { route: 'b' },
nextNodes: ['process-b'] // Only execute branch B
};
}
}
}Fan-out/Fan-in
One-to-many and many-to-one patterns:
const fanOutFanInWorkflow: WorkflowDefinition = {
id: 'fanout-fanin',
name: 'Fan-out/Fan-in Pattern',
entryNodeId: 'fetch-list',
nodes: [
{
id: 'fetch-list',
type: 'http',
config: { url: 'https://api.example.com/items' },
inputs: [],
outputs: ['process-items']
},
{
id: 'process-items',
type: 'parallel-processor',
config: { batchSize: 10 },
inputs: ['fetch-list'],
outputs: ['aggregate']
},
{
id: 'aggregate',
type: 'transform',
config: {},
inputs: ['process-items'],
outputs: []
}
]
};Built-in Node Types
HTTP Node
Make HTTP requests:
{
id: 'http-call',
type: 'http',
config: {
url: 'https://api.example.com/data',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer token'
}
},
inputs: [],
outputs: ['process-response']
}Transform Node
Transform data using JavaScript functions or JSONata expressions:
{
id: 'transform-data',
type: 'transform',
config: {
// Using JSONata expression
expression: '$$.payload ~> | $ | {"result": $} |',
// Or using inline function (registered separately)
transformFn: 'dataTransform'
},
inputs: ['fetch'],
outputs: ['save']
}Delay Node
Pause workflow execution for a specified duration:
{
id: 'wait-5-minutes',
type: 'delay',
config: {
duration: 300000, // 5 minutes in milliseconds (highest priority)
// OR
durationSeconds: 300, // 300 seconds
// OR
durationMinutes: 5 // 5 minutes
},
inputs: ['start'],
outputs: ['continue']
}Sub-workflow Node
Call another workflow as a reusable component:
{
id: 'send-email',
type: 'sub-workflow',
config: {
workflowId: 'email-sender-workflow',
inputMapping: {
'recipient': 'userEmail',
'subject': 'emailSubject'
},
outputMapping: {
'emailId': 'messageId'
},
continueOnFail: false
},
inputs: ['prepare-email'],
outputs: ['log-result']
}Data Passing
Entry Nodes
Entry nodes receive the initial workflow data:
const executionId = await engine.enqueueWorkflow('my-workflow', {
userId: 123,
action: 'signup'
});
// Entry node context.inputData contains:
// { userId: 123, action: 'signup' }Single Parent
Nodes with one parent receive the parent's output directly:
// Parent node output: { result: 'data' }
// Child node context.inputData: { result: 'data' }Multiple Parents
Nodes with multiple parents receive a merged object:
// Parent A output: { dataA: 1 }
// Parent B output: { dataB: 2 }
// Child node context.inputData: {
// 'parent-a': { dataA: 1 },
// 'parent-b': { dataB: 2 }
// }Accessing All Results
Use previousResults or allNodeResults to access any ancestor node's output:
class MyExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
// Access all direct parent results
const parentResults = context.previousResults;
// Access any ancestor result
const allResults = context.allNodeResults;
const initialData = allResults['start-node']?.data;
return { success: true, data: {} };
}
}Registering Workflows
await engine.registerWorkflow(workflow, changeNotes);
// With version tracking
await engine.registerWorkflow(workflow, 'Added new email notification step');