Advanced Features
Rate limiting, delays, job prioritization, and more
Advanced Features
SPANE provides advanced features for fine-tuning workflow behavior and performance.
Rate Limiting
Per-Node-Type Rate Limiting
Configure rate limits when registering node executors:
const registry = new NodeRegistry();
// Limit API calls to 100 requests per minute
registry.register('api-call', new ApiExecutor(), {
max: 100, // Max requests
duration: 60000 // Time window in milliseconds
});
// Limit database operations to 50 per second
registry.register('database-query', new DatabaseExecutor(), {
max: 50,
duration: 1000
});Global Worker Rate Limiting
Enable BullMQ's native rate limiting for all node executions:
const engineConfig: EngineConfig = {
useNativeRateLimiting: true,
rateLimiter: {
max: 100, // Maximum jobs to process
duration: 1000 // Within this time window (ms)
}
};
const engine = new WorkflowEngine(
registry,
stateStore,
redis,
undefined, undefined, undefined, undefined,
engineConfig
);Manual Rate Limiting
Trigger rate limiting from within executors:
class RateLimitedExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const response = await fetch(url);
// Handle rate limiting response
if (response.status === 429) {
const retryAfter = parseInt(
response.headers.get('retry-after') || '60',
10
) * 1000;
// Trigger rate limiting
if (context.rateLimit) {
const error = await context.rateLimit(retryAfter);
throw error; // Must throw for rate limit to work
}
}
const data = await response.json();
return { success: true, data };
}
}Delay Nodes
Delay nodes pause workflow execution for a specified duration.
Basic Delay
{
id: 'wait-5-minutes',
type: 'delay',
config: {
duration: 300000 // 5 minutes in milliseconds
},
inputs: ['start'],
outputs: ['continue']
}Duration Options
Delay supports multiple duration formats (first found is used):
{
id: 'delay-example',
type: 'delay',
config: {
// Option 1: Milliseconds (highest priority)
duration: 300000,
// Option 2: Seconds
durationSeconds: 300,
// Option 3: Minutes
durationMinutes: 5
}
}Use Cases
Retry After Delay
{
id: 'retry-after-failure',
type: 'delay',
config: {
durationMinutes: 10 // Wait 10 minutes before retry
},
inputs: ['failed'],
outputs: ['retry']
}Scheduled Processing
{
id: 'wait-for-business-hours',
type: 'delay',
config: {
duration: 86400000 // Wait 24 hours
},
inputs: ['after-hours'],
outputs: ['process']
}Throttling
{
id: 'throttle-batch',
type: 'delay',
config: {
duration: 5000 // 5 second delay between batches
},
inputs: ['batch-1'],
outputs: ['batch-2']
}Job Prioritization
Set job priorities to ensure critical workflows execute first.
Workflow-Level Priority
const workflow: WorkflowDefinition = {
id: 'critical-workflow',
name: 'Critical Workflow',
priority: 10, // Highest priority (1-10, higher = more important)
entryNodeId: 'start',
nodes: [...]
};Execution-Level Priority
const executionId = await engine.enqueueWorkflow(
'my-workflow',
{ data: 'here' },
undefined, // parentExecutionId
0, // depth
undefined, // parentJobId
{ priority: 8 } // Options
);Priority Guidelines
| Priority | Use Case |
|---|---|
| 1-3 | Background tasks, non-critical jobs |
| 4-6 | Standard operations, normal workflows |
| 7-8 | Important workflows, business critical |
| 9-10 | Emergency workflows, time-sensitive |
Example: Urgent vs Standard
// Urgent notification
const urgentExecution = await engine.enqueueWorkflow(
'send-alert',
{ message: 'Critical system failure' },
undefined, 0, undefined,
{ priority: 10 } // Highest priority
);
// Standard report
const reportExecution = await engine.enqueueWorkflow(
'generate-report',
{ type: 'daily' },
undefined, 0, undefined,
{ priority: 3 } // Low priority
);Delayed Execution
Schedule workflows to execute at specific times.
Schedule for Later
const executeAt = new Date('2024-12-31T23:59:59Z');
const executionId = await engine.scheduleWorkflow(
'new-year-notification',
{ message: 'Happy New Year!' },
executeAt
);Schedule from Current Time
// Execute 1 hour from now
const oneHourFromNow = new Date(Date.now() + (60 * 60 * 1000));
const executionId = await engine.scheduleWorkflow(
'hourly-reminder',
{ message: 'Time to check in' },
oneHourFromNow
);Validate Schedule Time
const scheduleTime = new Date('2024-01-01T00:00:00Z');
// Check if time is in future
if (scheduleTime.getTime() <= Date.now()) {
throw new Error('Cannot schedule workflow in the past');
}
const executionId = await engine.scheduleWorkflow(
'scheduled-workflow',
data,
scheduleTime
);Payload Management
Large Payload Handling
SPANE automatically offloads large payloads to PostgreSQL (Claim Check Pattern):
// Configure payload manager (optional)
const payloadManager = new PayloadManager(stateStore);
const engine = new WorkflowEngine(
registry,
stateStore,
redis,
undefined, // metricsCollector
undefined, // circuitBreakerRegistry
undefined, // cacheOptions
payloadManager
);Payloads larger than 50KB are automatically offloaded.
Manual Payload Offloading
await payloadManager.offloadIfNeeded(
executionId,
'initialData',
largePayload
);Loading Offloaded Payloads
Payloads are automatically loaded during execution. Access via context:
class MyExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const data = context.inputData; // Automatically loaded
return { success: true, data };
}
}Job Deduplication
Prevent duplicate workflow executions using custom job IDs:
const executionId = await engine.enqueueWorkflow(
'my-workflow',
{ orderId: 123 },
undefined, 0, undefined,
{
jobId: `order-processing-123`, // Custom job ID
priority: 5
}
);If same jobId is used again, BullMQ prevents duplicate execution.
Use Cases
Idempotent Order Processing
const executionId = await engine.enqueueWorkflow(
'process-order',
{ orderId: 'ORD-12345' },
undefined, 0, undefined,
{
jobId: `order-${orderId}`, // Unique per order
priority: 7 // High priority
}
);Daily Report
const today = new Date().toISOString().split('T')[0];
const executionId = await engine.enqueueWorkflow(
'daily-report',
{ date: today },
undefined, 0, undefined,
{
jobId: `report-${today}`, // One per day
priority: 3
}
);Bulk Operations
Bulk Workflow Enqueue
Execute multiple workflows efficiently:
const workflows = [
{
workflowId: 'send-welcome',
initialData: { userId: 1 },
options: { priority: 5 }
},
{
workflowId: 'send-welcome',
initialData: { userId: 2 },
options: { priority: 5 }
},
{
workflowId: 'send-welcome',
initialData: { userId: 3 },
options: { priority: 5 }
}
];
const executionIds = await engine.enqueueBulkWorkflows(workflows);
console.log(`Started ${executionIds.length} workflows`);Bulk Pause/Resume
// Pause multiple workflows
await engine.pauseBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);
// Resume multiple workflows
await engine.resumeBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);Bulk Cancel
// Cancel multiple workflows
await engine.cancelBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);Workflow Concurrency
Limit Concurrent Executions
Limit how many executions of a workflow can run simultaneously:
const workflow: WorkflowDefinition = {
id: 'limited-workflow',
name: 'Limited Concurrent Workflow',
maxConcurrency: 5, // Max 5 executions at once
concurrencyLockTTL: 3600, // Lock expires after 1 hour
entryNodeId: 'start',
nodes: [...]
};When max concurrency is reached, additional executions are queued until slots free up.
Use Cases
API Rate Limits
const workflow: WorkflowDefinition = {
id: 'external-api-calls',
name: 'External API Calls',
maxConcurrency: 10, // API allows 10 concurrent requests
concurrencyLockTTL: 300, // 5 minute lock
nodes: [...]
};Resource Constraints
const workflow: WorkflowDefinition = {
id: 'heavy-processing',
name: 'Heavy Processing',
maxConcurrency: 2, // Only 2 at a time due to resource limits
concurrencyLockTTL: 1800, // 30 minute lock
nodes: [...]
};Queue Statistics
Monitor queue health and performance:
const stats = await engine.getQueueStats();
console.log('Waiting jobs:', stats.waiting);
console.log('Active jobs:', stats.active);
console.log('Completed jobs:', stats.completed);
console.log('Failed jobs:', stats.failed);
console.log('Delayed jobs:', stats.delayed);
console.log('Paused jobs:', stats.paused);Dashboard Example
import { useState, useEffect } from 'react';
function QueueDashboard() {
const [stats, setStats] = useState({
waiting: 0,
active: 0,
completed: 0,
failed: 0,
delayed: 0
});
useEffect(() => {
const interval = setInterval(async () => {
const queueStats = await engine.getQueueStats();
setStats(queueStats);
}, 5000); // Update every 5 seconds
return () => clearInterval(interval);
}, []);
return (
<div>
<h2>Queue Statistics</h2>
<div>Waiting: {stats.waiting}</div>
<div>Active: {stats.active}</div>
<div>Completed: {stats.completed}</div>
<div>Failed: {stats.failed}</div>
<div>Delayed: {stats.delayed}</div>
</div>
);
}Replay Executions
Re-run a workflow with same initial data:
const newExecutionId = await engine.replayWorkflow(originalExecutionId);
console.log('Replayed execution:', newExecutionId);Use Cases
- Debugging: Re-run failed workflows to test fixes
- Recovery: Re-process data after fixing bugs
- Testing: Verify workflow changes with production data
Job Status Check
Check the status of individual jobs:
const status = await engine.getJobStatus(jobId);
if (status.exists) {
console.log('Job status:', status.status);
// Possible values: 'waiting', 'active', 'completed', 'failed', 'delayed'
}Complete Example
import { WorkflowEngine } from 'spane/engine/workflow-engine';
import type { EngineConfig } from 'spane/engine/config';
const engineConfig: EngineConfig = {
useNativeRateLimiting: true,
useFlowProducerForSubWorkflows: true,
workerConcurrency: 10,
rateLimiter: {
max: 100,
duration: 1000
}
};
const engine = new WorkflowEngine(
registry,
stateStore,
redis,
undefined, undefined, undefined, undefined,
engineConfig
);
// High priority workflow with concurrency limits
const urgentWorkflow: WorkflowDefinition = {
id: 'urgent-processing',
name: 'Urgent Processing',
priority: 10,
maxConcurrency: 5,
concurrencyLockTTL: 600,
entryNodeId: 'start',
nodes: [
{
id: 'start',
type: 'http',
config: {
url: 'https://api.example.com/urgent',
retryPolicy: {
maxAttempts: 5,
backoff: { type: 'exponential', delay: 1000 }
}
},
inputs: [],
outputs: ['delay']
},
{
id: 'delay',
type: 'delay',
config: {
duration: 5000 // Wait 5 seconds
},
inputs: ['start'],
outputs: ['complete']
},
{
id: 'complete',
type: 'email',
config: { to: 'admin@example.com' },
inputs: ['delay'],
outputs: []
}
]
};
await engine.registerWorkflow(urgentWorkflow);
// Execute with high priority
const executionId = await engine.enqueueWorkflow(
'urgent-processing',
{ urgent: true },
undefined, 0, undefined,
{ priority: 10 }
);