My App

Error Handling & DLQ

Handle failures, retry jobs, and manage Dead Letter Queue

Error Handling & DLQ

SPANE provides comprehensive error handling through standardized error classes, retry policies, Dead Letter Queue (DLQ), and workflow control operations.

Error Classes

SPANE defines a hierarchy of error classes for consistent error handling:

Base WorkflowError

All SPANE errors extend from WorkflowError:

import { WorkflowError, isWorkflowError } from '@manyeya/spane';

interface WorkflowError extends Error {
  code: WorkflowErrorCode;
  executionId?: string;
  nodeId?: string;
  workflowId?: string;
  timestamp: Date;
  originalCause?: Error;
}

// Check if error is a WorkflowError
if (isWorkflowError(error)) {
  console.error('Error code:', error.code);
  console.error('Execution ID:', error.executionId);
}

Error Codes

import { WorkflowErrorCode } from '@manyeya/spane';

enum WorkflowErrorCode {
  // Workflow errors
  WORKFLOW_NOT_FOUND = 'WORKFLOW_NOT_FOUND',
  WORKFLOW_VALIDATION_FAILED = 'WORKFLOW_VALIDATION_FAILED',
  WORKFLOW_CYCLE_DETECTED = 'WORKFLOW_CYCLE_DETECTED',

  // Node errors
  NODE_NOT_FOUND = 'NODE_NOT_FOUND',
  NODE_NOT_REGISTERED = 'NODE_NOT_REGISTERED',
  NODE_EXECUTION_FAILED = 'NODE_EXECUTION_FAILED',
  NODE_SKIPPED = 'NODE_SKIPPED',
  NODE_TIMEOUT = 'NODE_TIMEOUT',

  // Execution errors
  EXECUTION_NOT_FOUND = 'EXECUTION_NOT_FOUND',
  EXECUTION_TIMEOUT = 'EXECUTION_TIMEOUT',
  EXECUTION_CANCELLED = 'EXECUTION_CANCELLED',
  EXECUTION_PAUSED = 'EXECUTION_PAUSED',
  EXECUTION_MAX_DEPTH_EXCEEDED = 'EXECUTION_MAX_DEPTH_EXCEEDED',

  // Configuration errors
  INVALID_CONFIG = 'INVALID_CONFIG',
  INVALID_PRIORITY = 'INVALID_PRIORITY',
  INVALID_DELAY = 'INVALID_DELAY',

  // State errors
  STATE_CORRUPTION = 'STATE_CORRUPTION',
  STATE_PERSISTENCE_FAILED = 'STATE_PERSISTENCE_FAILED',

  // Queue errors
  QUEUE_ERROR = 'QUEUE_ERROR',
  WORKER_ERROR = 'WORKER_ERROR',

  // Sub-workflow errors
  SUBWORKFLOW_FAILED = 'SUBWORKFLOW_FAILED',
  SUBWORKFLOW_NOT_FOUND = 'SUBWORKFLOW_NOT_FOUND',

  // Rate limiting
  RATE_LIMIT_EXCEEDED = 'RATE_LIMIT_EXCEEDED',

  // Circuit breaker
  CIRCUIT_BREAKER_OPEN = 'CIRCUIT_BREAKER_OPEN',

  // Generic
  UNKNOWN_ERROR = 'UNKNOWN_ERROR',
}

Specialized Error Classes

import {
  WorkflowNotFoundError,
  WorkflowValidationError,
  NodeExecutionError,
  NodeNotRegisteredError,
  ExecutionTimeoutError,
  MaxDepthExceededError,
  RateLimitError,
  CircuitBreakerOpenError,
  StatePersistenceError
} from '@manyeya/spane';

Using Error Classes

import { WorkflowNotFoundError, NodeExecutionError } from '@manyeya/spane';

// In your code
throw new WorkflowNotFoundError('my-workflow');

throw new NodeExecutionError(
  'node-1',
  'exec-123',
  'API timeout after 30s',
  new Error('Request timeout')
);

Error Utility Functions

Check Retryable Errors

import { isRetryableError } from '@manyeya/spane';

try {
  await someOperation();
} catch (error) {
  if (isRetryableError(error)) {
    // Error will be retried by BullMQ
    // These include: RATE_LIMIT_EXCEEDED, CIRCUIT_BREAKER_OPEN,
    // NODE_TIMEOUT, QUEUE_ERROR, WORKER_ERROR
    console.log('Will retry:', error.message);
  } else {
    // Non-retryable error
    console.error('Fatal error:', error.message);
  }
}

Check DLQ Eligibility

import { shouldMoveToDLQ } from '@manyeya/spane';

// Determine if error should go to DLQ
if (shouldMoveToDLQ(error, attemptsMade, maxAttempts)) {
  // Error will go to DLQ because:
  // 1. Max attempts exceeded, OR
  // 2. Error is non-retryable (VALIDATION_FAILED, CYCLE_DETECTED, etc.)
  console.log('Moving to DLQ');
}

Get User-Friendly Messages

import { getUserMessage } from '@manyeya/spane';

try {
  await engine.enqueueWorkflow('my-workflow', data);
} catch (error) {
  const userMessage = getUserMessage(error);
  // Returns sanitized messages like:
  // - "The specified workflow could not be found."
  // - "The workflow definition is invalid."
  // - "Too many requests. Please try again later."
  alert(userMessage);
}

Retry Policies

Configure retry behavior at the node level:

Retry Configuration

{
  id: 'unstable-api',
  type: 'http',
  config: {
    url: 'https://unstable-api.example.com',
  },
  retryPolicy: {
    maxAttempts: 5,           // Total attempts (initial + retries)
    backoff: {
      type: 'exponential',     // or 'fixed'
      delay: 1000            // Initial delay in ms
    },
    continueOnFail: false      // Stop workflow if all retries fail
  },
  inputs: [],
  outputs: []
}

Retry Policy Options

OptionTypeDefaultDescription
maxAttemptsnumber3Total number of attempts
backoff.type'fixed' | 'exponential''exponential'Backoff strategy
backoff.delaynumber1000Initial delay in milliseconds
continueOnFailbooleanfalseContinue workflow if all retries fail

Fixed Backoff

Constant delay between retries:

{
  retryPolicy: {
    maxAttempts: 5,
    backoff: {
      type: 'fixed',
      delay: 2000  // 2 second delay between each attempt
    }
  }
}

// Retry schedule:
// Attempt 1: 0ms (immediate)
// Attempt 2: 2000ms
// Attempt 3: 2000ms
// Attempt 4: 2000ms
// Attempt 5: 2000ms

Exponential Backoff

Delay increases exponentially:

{
  retryPolicy: {
    maxAttempts: 5,
    backoff: {
      type: 'exponential',
      delay: 1000  // 1 second base delay
    }
  }
}

// Retry schedule:
// Attempt 1: 0ms (immediate)
// Attempt 2: 1000ms
// Attempt 3: 2000ms
// Attempt 4: 4000ms
// Attempt 5: 8000ms

Continue on Fail

Allow workflow to continue even after all retries fail:

{
  id: 'optional-enrichment',
  type: 'http',
  config: {
    url: 'https://enrichment-api.example.com',
  },
  retryPolicy: {
    maxAttempts: 3,
    continueOnFail: true  // Don't stop workflow
  },
  inputs: ['data'],
  outputs: ['save']
}

In the next node, check if enrichment failed:

class SaveExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { data, enrichment } = context.previousResults;

    if (enrichment.success === false) {
      // Handle failed enrichment
      console.warn('Enrichment failed, proceeding with base data:', data.data);
      return { success: true, data: data.data };
    }

    // Use enriched data
    return { success: true, data: { ...data.data, ...enrichment.data } };
  }
}

Dead Letter Queue (DLQ)

The DLQ stores jobs that have exhausted all retry attempts.

What Goes to DLQ?

Jobs are sent to DLQ when:

  • All retry attempts are exhausted
  • Job fails with continueOnFail: false
  • Job crashes repeatedly
  • Error is non-retryable (validation errors, cycle detected, etc.)

Accessing DLQ

// Get DLQ items
const items = await engine.getDLQItems(0, 10);  // start, end

for (const item of items) {
  console.log('Job ID:', item.jobId);
  console.log('Execution ID:', item.executionId);
  console.log('Workflow ID:', item.workflowId);
  console.log('Node ID:', item.nodeId);
  console.log('Attempts:', item.attemptsMade);
  console.log('Failed At:', item.failedAt);
  console.log('Error:', item.stacktrace);
}

Retry from DLQ

// Retry a specific DLQ item
const success = await engine.retryDLQItem(dlqJobId);

if (success) {
  console.log('Job successfully retried');
} else {
  console.log('Failed to retry job');
}

Retry Multiple Items

// Retry all DLQ items
const items = await engine.getDLQItems(0, -1);  // Get all
let successCount = 0;

for (const item of items) {
  const success = await engine.retryDLQItem(item.jobId);
  if (success) successCount++;
}

console.log(`Retried ${successCount}/${items.length} jobs`);

Workflow Control

Pause Workflow

Pause a running workflow:

await engine.pauseExecution(executionId);

What happens when paused:

  • Workflow status changes to 'paused'
  • Waiting jobs moved to delayed state (24 hours)
  • Active jobs continue to completion
  • No new jobs are enqueued

Resume Workflow

Resume a paused workflow:

await engine.resumeExecution(executionId);

What happens when resumed:

  • Workflow status changes to 'running'
  • Delayed jobs promoted back to waiting
  • Execution continues from where it left off

Cancel Workflow

Cancel a workflow completely:

await engine.cancelExecution(executionId);

What happens when cancelled:

  • Workflow status changes to 'cancelled'
  • All pending jobs removed from queue
  • Active jobs continue to completion
  • No new jobs are enqueued

Error Handling Patterns

Graceful Degradation

class GracefulExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    try {
      // Try primary service
      return await this.callPrimaryService(context);
    } catch (primaryError) {
      console.warn('Primary service failed:', primaryError);

      try {
        // Try fallback service
        return await this.callFallbackService(context);
      } catch (fallbackError) {
        // Return cached data or default
        return {
          success: true,
          data: this.getCachedData(context),
        };
      }
    }
  }
}

Circuit Breaker + Retry

Combine circuit breaker with retries:

{
  id: 'protected-api',
  type: 'http',
  config: {
    url: 'https://api.example.com/endpoint',
  },
  retryPolicy: {
    maxAttempts: 5,
    backoff: { type: 'exponential', delay: 1000 }
  },
  circuitBreaker: {
    enabled: true,
    failureThreshold: 10,
    successThreshold: 2,
    timeout: 60000
  }
}

Error Monitoring

Track Failed Executions

const execution = await stateStore.getExecution(executionId);

if (execution.status === 'failed') {
  console.error('Workflow failed:', execution);

  // Find failed nodes
  for (const [nodeId, result] of Object.entries(execution.nodeResults)) {
    if (result.success === false) {
      console.error(`Node ${nodeId} failed:`, result.error);
    }
  }
}

Complete Error Handling Example

import {
  WorkflowEngine,
  isRetryableError,
  shouldMoveToDLQ,
  getUserMessage
} from '@manyeya/spane';

// Configure engine
const engine = new WorkflowEngine(registry, stateStore, redis);

// Register workflow with error handling
const workflow: WorkflowDefinition = {
  id: 'robust-workflow',
  name: 'Robust Workflow with Error Handling',
  nodes: [
    {
      id: 'step1',
      type: 'http',
      config: {
        url: 'https://api1.example.com',
      },
      retryPolicy: {
        maxAttempts: 5,
        backoff: { type: 'exponential', delay: 1000 }
      },
      inputs: [],
      outputs: ['step2']
    },
    {
      id: 'step2',
      type: 'http',
      config: {
        url: 'https://api2.example.com',
      },
      retryPolicy: {
        maxAttempts: 3,
        continueOnFail: true  // Optional step
      },
      inputs: ['step1'],
      outputs: ['step3']
    }
  ]
};

// Execute and monitor
const executionId = await engine.enqueueWorkflow('robust-workflow', data);

// Monitor execution
const execution = await stateStore.getExecution(executionId);

if (execution.status === 'failed') {
  for (const [nodeId, result] of Object.entries(execution.nodeResults)) {
    if (result.success === false) {
      console.error(`Node ${nodeId} failed:`, result.error);

      // Check if retryable
      if (isRetryableError(new Error(result.error || ''))) {
        console.log('This error is retryable');
      }
    }
  }
}

Best Practices

  1. Set Appropriate Retry Limits: 3-5 retries for most cases
  2. Use Exponential Backoff: Avoid retry storms
  3. Implement Fallbacks: Have alternative data sources
  4. Monitor DLQ: Regularly review and retry failed jobs
  5. Use Circuit Breakers: Protect external dependencies
  6. Handle Transient Errors: Differentiate retryable vs non-retryable errors
  7. Set Timeouts: Prevent indefinite hanging

Next Steps

On this page