Node Executors
Create custom node executors to process workflow steps
Node Executors
Node executors are the core building blocks of SPANE workflows. Each executor implements the INodeExecutor interface and defines how a specific node type processes data.
Creating a Node Executor
Basic Executor
All node executors must implement the INodeExecutor interface:
import type { INodeExecutor, ExecutionContext, ExecutionResult } from 'spane';
export class MyExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
try {
// 1. Extract configuration
const { option1, option2 } = context.nodeConfig || {};
// 2. Access input data
const inputData = context.inputData;
// 3. Process data
const result = await this.processData(inputData, option1, option2);
// 4. Return result
return {
success: true,
data: result
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
private async processData(data: any, opt1?: string, opt2?: string) {
// Your processing logic here
return data;
}
}Execution Context
The ExecutionContext provides all the information your executor needs:
interface ExecutionContext {
workflowId: string; // Current workflow ID
executionId: string; // Unique execution ID
nodeId: string; // Current node ID
inputData: any; // Input data for this node
nodeConfig?: Record<string, any>; // Node configuration
previousResults: Record<string, ExecutionResult>; // Parent node results
allNodeResults?: Record<string, ExecutionResult>; // All ancestor results
parentExecutionId?: string; // Parent execution ID (for sub-workflows)
depth: number; // Nesting depth
rateLimit?: (duration: number) => Promise<Error>; // Rate limiter
}Accessing Input Data
class DataProcessor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
// Get input data (varies by node position)
const data = context.inputData;
// Entry nodes: initial workflow data
// Single parent: parent's output
// Multiple parents: merged object { 'parent-id': output }
// Get node configuration
const { timeout, retries } = context.nodeConfig || {};
return { success: true, data: { processed: true } };
}
}Accessing Previous Results
class Aggregator implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
// Access direct parent results
const parentAResult = context.previousResults['parent-a'];
const parentBResult = context.previousResults['parent-b'];
// Access any ancestor result
if (context.allNodeResults) {
const startNodeResult = context.allNodeResults['start-node'];
}
// Aggregate results
const aggregated = {
fromA: parentAResult?.data,
fromB: parentBResult?.data
};
return { success: true, data: aggregated };
}
}Using Rate Limiting
class ApiCaller implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { url } = context.nodeConfig || {};
const response = await fetch(url);
// Handle rate limiting
if (response.status === 429) {
const retryAfter = parseInt(
response.headers.get('retry-after') || '60',
10
) * 1000;
// Trigger rate limiting and throw
if (context.rateLimit) {
const error = await context.rateLimit(retryAfter);
throw error;
}
}
const data = await response.json();
return { success: true, data };
}
}Common Executor Patterns
HTTP Executor
export class HttpExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const {
url,
method = 'GET',
headers = {},
body,
timeout = 30000
} = context.nodeConfig || {};
if (!url) {
return {
success: false,
error: 'URL is required'
};
}
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
method,
headers: {
'Content-Type': 'application/json',
...headers
},
body: method !== 'GET' && body ? JSON.stringify(body) : undefined,
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
return {
success: false,
error: `HTTP ${response.status}: ${response.statusText}`
};
}
const data = await response.json();
return { success: true, data };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
}Database Executor
export class DatabaseExecutor implements INodeExecutor {
constructor(private pool: any) {}
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { query, params } = context.nodeConfig || {};
if (!query) {
return {
success: false,
error: 'Query is required'
};
}
try {
const result = await this.pool.query(query, params);
return { success: true, data: result.rows };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Database error'
};
}
}
}Email Executor
import { Resend } from 'resend';
export class EmailExecutor implements INodeExecutor {
constructor(private emailService: Resend) {}
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const {
to,
subject,
from,
template,
html,
text
} = context.nodeConfig || {};
if (!to || !subject || !from) {
return {
success: false,
error: 'to, subject, and from are required'
};
}
try {
const result = await this.emailService.emails.send({
from,
to,
subject,
html: html || template,
text
});
return { success: true, data: { messageId: result.data.id } };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Email error'
};
}
}
}Transform Executor (JSONata)
import * as jsonata from 'jsonata';
export class TransformExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { expression, transformFn } = context.nodeConfig || {};
if (expression) {
// Using JSONata expression
const compiled = jsonata(expression);
const result = await compiled.evaluate(context.inputData);
return { success: true, data: result };
}
if (transformFn) {
// Using custom transform function (registered separately)
const transforms: Record<string, (data: any) => any> = {
dataTransform: (data: any) => ({
...data,
transformed: true,
timestamp: new Date().toISOString()
})
};
const transform = transforms[transformFn];
if (!transform) {
return {
success: false,
error: `Transform function '${transformFn}' not found`
};
}
const result = transform(context.inputData);
return { success: true, data: result };
}
// Default: just pass through data
return { success: true, data: context.inputData };
}
}Router Executor (Conditional Branching)
export class RouterExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const {
condition,
cases = {}
} = context.nodeConfig || {};
const value = context.inputData[condition] ?? context.inputData;
// Find matching case
for (const [caseValue, targetNode] of Object.entries(cases)) {
if (value === caseValue) {
return {
success: true,
data: { routedTo: targetNode },
nextNodes: [targetNode as string] // Only execute this branch
};
}
}
// Default case
if (cases.default) {
return {
success: true,
data: { routedTo: cases.default },
nextNodes: [cases.default as string]
};
}
return {
success: false,
error: `No matching case for value: ${value}`
};
}
}Delay Executor
export class DelayExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { duration, durationSeconds, durationMinutes } = context.nodeConfig || {};
let delayMs = 0;
// Priority: duration > durationSeconds > durationMinutes
if (duration) {
delayMs = duration;
} else if (durationSeconds) {
delayMs = durationSeconds * 1000;
} else if (durationMinutes) {
delayMs = durationMinutes * 60 * 1000;
}
if (delayMs <= 0) {
return {
success: false,
error: 'Invalid delay duration'
};
}
// Pass data through with delay
return {
success: true,
data: context.inputData
};
}
}Registering Executors
Basic Registration
import { NodeRegistry } from 'spane';
const registry = new NodeRegistry();
registry.register('http', new HttpExecutor());
registry.register('email', new EmailExecutor(emailService));
registry.register('transform', new TransformExecutor());With Circuit Breaker
import { CircuitBreakerRegistry } from 'spane/utils/circuit-breaker';
const circuitBreakerRegistry = new CircuitBreakerRegistry();
// Register executor with circuit breaker protection
registry.register('http', new HttpExecutor(), {
failureThreshold: 5,
successThreshold: 2,
timeout: 60000
});
// Pass to engine
const engine = new WorkflowEngine(
registry,
stateStore,
redis,
undefined,
circuitBreakerRegistry
);With Rate Limiting
// Per-node-type rate limiting
registry.register('api-call', new ApiCaller(), {
max: 100, // 100 requests
duration: 60000 // per minute
});Error Handling
Retry Policies
Define retry behavior in node configuration:
{
id: 'unstable-api',
type: 'http',
config: {
url: 'https://unstable-api.example.com',
retryPolicy: {
maxAttempts: 5,
backoff: {
type: 'exponential',
delay: 1000
},
continueOnFail: false
}
},
inputs: ['start'],
outputs: ['process']
}Continue on Fail
Allow workflow to continue even if node fails:
{
id: 'optional-enrichment',
type: 'http',
config: {
url: 'https://enrichment.example.com',
retryPolicy: {
maxAttempts: 3,
continueOnFail: true // Don't stop workflow on failure
}
},
inputs: ['data'],
outputs: ['save']
}Best Practices
- Keep executors focused: Single responsibility principle
- Handle all errors: Always return success/error state
- Use timeouts: Prevent indefinite hanging
- Log important events: Use context for correlation
- Validate inputs: Check required configuration
- Make idempotent: Safe to retry multiple times
- Use circuit breakers: Protect external dependencies