Error Handling & DLQ
Handle failures, retry jobs, and manage Dead Letter Queue
Error Handling & DLQ
SPANE provides comprehensive error handling through standardized error classes, retry policies, Dead Letter Queue (DLQ), and workflow control operations.
Error Classes
SPANE defines a hierarchy of error classes for consistent error handling:
Base WorkflowError
All SPANE errors extend from WorkflowError:
import { WorkflowError, isWorkflowError } from '@manyeya/spane';
interface WorkflowError extends Error {
code: WorkflowErrorCode;
executionId?: string;
nodeId?: string;
workflowId?: string;
timestamp: Date;
originalCause?: Error;
}
// Check if error is a WorkflowError
if (isWorkflowError(error)) {
console.error('Error code:', error.code);
console.error('Execution ID:', error.executionId);
}Error Codes
import { WorkflowErrorCode } from '@manyeya/spane';
enum WorkflowErrorCode {
// Workflow errors
WORKFLOW_NOT_FOUND = 'WORKFLOW_NOT_FOUND',
WORKFLOW_VALIDATION_FAILED = 'WORKFLOW_VALIDATION_FAILED',
WORKFLOW_CYCLE_DETECTED = 'WORKFLOW_CYCLE_DETECTED',
// Node errors
NODE_NOT_FOUND = 'NODE_NOT_FOUND',
NODE_NOT_REGISTERED = 'NODE_NOT_REGISTERED',
NODE_EXECUTION_FAILED = 'NODE_EXECUTION_FAILED',
NODE_SKIPPED = 'NODE_SKIPPED',
NODE_TIMEOUT = 'NODE_TIMEOUT',
// Execution errors
EXECUTION_NOT_FOUND = 'EXECUTION_NOT_FOUND',
EXECUTION_TIMEOUT = 'EXECUTION_TIMEOUT',
EXECUTION_CANCELLED = 'EXECUTION_CANCELLED',
EXECUTION_PAUSED = 'EXECUTION_PAUSED',
EXECUTION_MAX_DEPTH_EXCEEDED = 'EXECUTION_MAX_DEPTH_EXCEEDED',
// Configuration errors
INVALID_CONFIG = 'INVALID_CONFIG',
INVALID_PRIORITY = 'INVALID_PRIORITY',
INVALID_DELAY = 'INVALID_DELAY',
// State errors
STATE_CORRUPTION = 'STATE_CORRUPTION',
STATE_PERSISTENCE_FAILED = 'STATE_PERSISTENCE_FAILED',
// Queue errors
QUEUE_ERROR = 'QUEUE_ERROR',
WORKER_ERROR = 'WORKER_ERROR',
// Sub-workflow errors
SUBWORKFLOW_FAILED = 'SUBWORKFLOW_FAILED',
SUBWORKFLOW_NOT_FOUND = 'SUBWORKFLOW_NOT_FOUND',
// Rate limiting
RATE_LIMIT_EXCEEDED = 'RATE_LIMIT_EXCEEDED',
// Circuit breaker
CIRCUIT_BREAKER_OPEN = 'CIRCUIT_BREAKER_OPEN',
// Generic
UNKNOWN_ERROR = 'UNKNOWN_ERROR',
}Specialized Error Classes
import {
WorkflowNotFoundError,
WorkflowValidationError,
NodeExecutionError,
NodeNotRegisteredError,
ExecutionTimeoutError,
MaxDepthExceededError,
RateLimitError,
CircuitBreakerOpenError,
StatePersistenceError
} from '@manyeya/spane';Using Error Classes
import { WorkflowNotFoundError, NodeExecutionError } from '@manyeya/spane';
// In your code
throw new WorkflowNotFoundError('my-workflow');
throw new NodeExecutionError(
'node-1',
'exec-123',
'API timeout after 30s',
new Error('Request timeout')
);Error Utility Functions
Check Retryable Errors
import { isRetryableError } from '@manyeya/spane';
try {
await someOperation();
} catch (error) {
if (isRetryableError(error)) {
// Error will be retried by BullMQ
// These include: RATE_LIMIT_EXCEEDED, CIRCUIT_BREAKER_OPEN,
// NODE_TIMEOUT, QUEUE_ERROR, WORKER_ERROR
console.log('Will retry:', error.message);
} else {
// Non-retryable error
console.error('Fatal error:', error.message);
}
}Check DLQ Eligibility
import { shouldMoveToDLQ } from '@manyeya/spane';
// Determine if error should go to DLQ
if (shouldMoveToDLQ(error, attemptsMade, maxAttempts)) {
// Error will go to DLQ because:
// 1. Max attempts exceeded, OR
// 2. Error is non-retryable (VALIDATION_FAILED, CYCLE_DETECTED, etc.)
console.log('Moving to DLQ');
}Get User-Friendly Messages
import { getUserMessage } from '@manyeya/spane';
try {
await engine.enqueueWorkflow('my-workflow', data);
} catch (error) {
const userMessage = getUserMessage(error);
// Returns sanitized messages like:
// - "The specified workflow could not be found."
// - "The workflow definition is invalid."
// - "Too many requests. Please try again later."
alert(userMessage);
}Retry Policies
Configure retry behavior at the node level:
Retry Configuration
{
id: 'unstable-api',
type: 'http',
config: {
url: 'https://unstable-api.example.com',
},
retryPolicy: {
maxAttempts: 5, // Total attempts (initial + retries)
backoff: {
type: 'exponential', // or 'fixed'
delay: 1000 // Initial delay in ms
},
continueOnFail: false // Stop workflow if all retries fail
},
inputs: [],
outputs: []
}Retry Policy Options
| Option | Type | Default | Description |
|---|---|---|---|
maxAttempts | number | 3 | Total number of attempts |
backoff.type | 'fixed' | 'exponential' | 'exponential' | Backoff strategy |
backoff.delay | number | 1000 | Initial delay in milliseconds |
continueOnFail | boolean | false | Continue workflow if all retries fail |
Fixed Backoff
Constant delay between retries:
{
retryPolicy: {
maxAttempts: 5,
backoff: {
type: 'fixed',
delay: 2000 // 2 second delay between each attempt
}
}
}
// Retry schedule:
// Attempt 1: 0ms (immediate)
// Attempt 2: 2000ms
// Attempt 3: 2000ms
// Attempt 4: 2000ms
// Attempt 5: 2000msExponential Backoff
Delay increases exponentially:
{
retryPolicy: {
maxAttempts: 5,
backoff: {
type: 'exponential',
delay: 1000 // 1 second base delay
}
}
}
// Retry schedule:
// Attempt 1: 0ms (immediate)
// Attempt 2: 1000ms
// Attempt 3: 2000ms
// Attempt 4: 4000ms
// Attempt 5: 8000msContinue on Fail
Allow workflow to continue even after all retries fail:
{
id: 'optional-enrichment',
type: 'http',
config: {
url: 'https://enrichment-api.example.com',
},
retryPolicy: {
maxAttempts: 3,
continueOnFail: true // Don't stop workflow
},
inputs: ['data'],
outputs: ['save']
}In the next node, check if enrichment failed:
class SaveExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const { data, enrichment } = context.previousResults;
if (enrichment.success === false) {
// Handle failed enrichment
console.warn('Enrichment failed, proceeding with base data:', data.data);
return { success: true, data: data.data };
}
// Use enriched data
return { success: true, data: { ...data.data, ...enrichment.data } };
}
}Dead Letter Queue (DLQ)
The DLQ stores jobs that have exhausted all retry attempts.
What Goes to DLQ?
Jobs are sent to DLQ when:
- All retry attempts are exhausted
- Job fails with
continueOnFail: false - Job crashes repeatedly
- Error is non-retryable (validation errors, cycle detected, etc.)
Accessing DLQ
// Get DLQ items
const items = await engine.getDLQItems(0, 10); // start, end
for (const item of items) {
console.log('Job ID:', item.jobId);
console.log('Execution ID:', item.executionId);
console.log('Workflow ID:', item.workflowId);
console.log('Node ID:', item.nodeId);
console.log('Attempts:', item.attemptsMade);
console.log('Failed At:', item.failedAt);
console.log('Error:', item.stacktrace);
}Retry from DLQ
// Retry a specific DLQ item
const success = await engine.retryDLQItem(dlqJobId);
if (success) {
console.log('Job successfully retried');
} else {
console.log('Failed to retry job');
}Retry Multiple Items
// Retry all DLQ items
const items = await engine.getDLQItems(0, -1); // Get all
let successCount = 0;
for (const item of items) {
const success = await engine.retryDLQItem(item.jobId);
if (success) successCount++;
}
console.log(`Retried ${successCount}/${items.length} jobs`);Workflow Control
Pause Workflow
Pause a running workflow:
await engine.pauseExecution(executionId);What happens when paused:
- Workflow status changes to 'paused'
- Waiting jobs moved to delayed state (24 hours)
- Active jobs continue to completion
- No new jobs are enqueued
Resume Workflow
Resume a paused workflow:
await engine.resumeExecution(executionId);What happens when resumed:
- Workflow status changes to 'running'
- Delayed jobs promoted back to waiting
- Execution continues from where it left off
Cancel Workflow
Cancel a workflow completely:
await engine.cancelExecution(executionId);What happens when cancelled:
- Workflow status changes to 'cancelled'
- All pending jobs removed from queue
- Active jobs continue to completion
- No new jobs are enqueued
Error Handling Patterns
Graceful Degradation
class GracefulExecutor implements INodeExecutor {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
try {
// Try primary service
return await this.callPrimaryService(context);
} catch (primaryError) {
console.warn('Primary service failed:', primaryError);
try {
// Try fallback service
return await this.callFallbackService(context);
} catch (fallbackError) {
// Return cached data or default
return {
success: true,
data: this.getCachedData(context),
};
}
}
}
}Circuit Breaker + Retry
Combine circuit breaker with retries:
{
id: 'protected-api',
type: 'http',
config: {
url: 'https://api.example.com/endpoint',
},
retryPolicy: {
maxAttempts: 5,
backoff: { type: 'exponential', delay: 1000 }
},
circuitBreaker: {
enabled: true,
failureThreshold: 10,
successThreshold: 2,
timeout: 60000
}
}Error Monitoring
Track Failed Executions
const execution = await stateStore.getExecution(executionId);
if (execution.status === 'failed') {
console.error('Workflow failed:', execution);
// Find failed nodes
for (const [nodeId, result] of Object.entries(execution.nodeResults)) {
if (result.success === false) {
console.error(`Node ${nodeId} failed:`, result.error);
}
}
}Complete Error Handling Example
import {
WorkflowEngine,
isRetryableError,
shouldMoveToDLQ,
getUserMessage
} from '@manyeya/spane';
// Configure engine
const engine = new WorkflowEngine(registry, stateStore, redis);
// Register workflow with error handling
const workflow: WorkflowDefinition = {
id: 'robust-workflow',
name: 'Robust Workflow with Error Handling',
nodes: [
{
id: 'step1',
type: 'http',
config: {
url: 'https://api1.example.com',
},
retryPolicy: {
maxAttempts: 5,
backoff: { type: 'exponential', delay: 1000 }
},
inputs: [],
outputs: ['step2']
},
{
id: 'step2',
type: 'http',
config: {
url: 'https://api2.example.com',
},
retryPolicy: {
maxAttempts: 3,
continueOnFail: true // Optional step
},
inputs: ['step1'],
outputs: ['step3']
}
]
};
// Execute and monitor
const executionId = await engine.enqueueWorkflow('robust-workflow', data);
// Monitor execution
const execution = await stateStore.getExecution(executionId);
if (execution.status === 'failed') {
for (const [nodeId, result] of Object.entries(execution.nodeResults)) {
if (result.success === false) {
console.error(`Node ${nodeId} failed:`, result.error);
// Check if retryable
if (isRetryableError(new Error(result.error || ''))) {
console.log('This error is retryable');
}
}
}
}Best Practices
- Set Appropriate Retry Limits: 3-5 retries for most cases
- Use Exponential Backoff: Avoid retry storms
- Implement Fallbacks: Have alternative data sources
- Monitor DLQ: Regularly review and retry failed jobs
- Use Circuit Breakers: Protect external dependencies
- Handle Transient Errors: Differentiate retryable vs non-retryable errors
- Set Timeouts: Prevent indefinite hanging