My App

Advanced Features

Rate limiting, delays, job prioritization, and more

Advanced Features

SPANE provides advanced features for fine-tuning workflow behavior and performance.

Rate Limiting

Per-Node-Type Rate Limiting

Configure rate limits when registering node executors:

const registry = new NodeRegistry();

// Limit API calls to 100 requests per minute
registry.register('api-call', new ApiExecutor(), {
  max: 100,      // Max requests
  duration: 60000  // Time window in milliseconds
});

// Limit database operations to 50 per second
registry.register('database-query', new DatabaseExecutor(), {
  max: 50,
  duration: 1000
});

Global Worker Rate Limiting

Enable BullMQ's native rate limiting for all node executions:

const engineConfig: EngineConfig = {
  useNativeRateLimiting: true,
  rateLimiter: {
    max: 100,      // Maximum jobs to process
    duration: 1000  // Within this time window (ms)
  }
};

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  undefined, undefined, undefined, undefined,
  engineConfig
);

Manual Rate Limiting

Trigger rate limiting from within executors:

class RateLimitedExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const response = await fetch(url);

    // Handle rate limiting response
    if (response.status === 429) {
      const retryAfter = parseInt(
        response.headers.get('retry-after') || '60',
        10
      ) * 1000;

      // Trigger rate limiting
      if (context.rateLimit) {
        const error = await context.rateLimit(retryAfter);
        throw error;  // Must throw for rate limit to work
      }
    }

    const data = await response.json();
    return { success: true, data };
  }
}

Delay Nodes

Delay nodes pause workflow execution for a specified duration.

Basic Delay

{
  id: 'wait-5-minutes',
  type: 'delay',
  config: {
    duration: 300000  // 5 minutes in milliseconds
  },
  inputs: ['start'],
  outputs: ['continue']
}

Duration Options

Delay supports multiple duration formats (first found is used):

{
  id: 'delay-example',
  type: 'delay',
  config: {
    // Option 1: Milliseconds (highest priority)
    duration: 300000,

    // Option 2: Seconds
    durationSeconds: 300,

    // Option 3: Minutes
    durationMinutes: 5
  }
}

Use Cases

Retry After Delay

{
  id: 'retry-after-failure',
  type: 'delay',
  config: {
    durationMinutes: 10  // Wait 10 minutes before retry
  },
  inputs: ['failed'],
  outputs: ['retry']
}

Scheduled Processing

{
  id: 'wait-for-business-hours',
  type: 'delay',
  config: {
    duration: 86400000  // Wait 24 hours
  },
  inputs: ['after-hours'],
  outputs: ['process']
}

Throttling

{
  id: 'throttle-batch',
  type: 'delay',
  config: {
    duration: 5000  // 5 second delay between batches
  },
  inputs: ['batch-1'],
  outputs: ['batch-2']
}

Job Prioritization

Set job priorities to ensure critical workflows execute first.

Workflow-Level Priority

const workflow: WorkflowDefinition = {
  id: 'critical-workflow',
  name: 'Critical Workflow',
  priority: 10,  // Highest priority (1-10, higher = more important)
  entryNodeId: 'start',
  nodes: [...]
};

Execution-Level Priority

const executionId = await engine.enqueueWorkflow(
  'my-workflow',
  { data: 'here' },
  undefined,  // parentExecutionId
  0,          // depth
  undefined,   // parentJobId
  { priority: 8 }  // Options
);

Priority Guidelines

PriorityUse Case
1-3Background tasks, non-critical jobs
4-6Standard operations, normal workflows
7-8Important workflows, business critical
9-10Emergency workflows, time-sensitive

Example: Urgent vs Standard

// Urgent notification
const urgentExecution = await engine.enqueueWorkflow(
  'send-alert',
  { message: 'Critical system failure' },
  undefined, 0, undefined,
  { priority: 10 }  // Highest priority
);

// Standard report
const reportExecution = await engine.enqueueWorkflow(
  'generate-report',
  { type: 'daily' },
  undefined, 0, undefined,
  { priority: 3 }  // Low priority
);

Delayed Execution

Schedule workflows to execute at specific times.

Schedule for Later

const executeAt = new Date('2024-12-31T23:59:59Z');

const executionId = await engine.scheduleWorkflow(
  'new-year-notification',
  { message: 'Happy New Year!' },
  executeAt
);

Schedule from Current Time

// Execute 1 hour from now
const oneHourFromNow = new Date(Date.now() + (60 * 60 * 1000));

const executionId = await engine.scheduleWorkflow(
  'hourly-reminder',
  { message: 'Time to check in' },
  oneHourFromNow
);

Validate Schedule Time

const scheduleTime = new Date('2024-01-01T00:00:00Z');

// Check if time is in future
if (scheduleTime.getTime() <= Date.now()) {
  throw new Error('Cannot schedule workflow in the past');
}

const executionId = await engine.scheduleWorkflow(
  'scheduled-workflow',
  data,
  scheduleTime
);

Payload Management

Large Payload Handling

SPANE automatically offloads large payloads to PostgreSQL (Claim Check Pattern):

// Configure payload manager (optional)
const payloadManager = new PayloadManager(stateStore);

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  undefined,  // metricsCollector
  undefined,  // circuitBreakerRegistry
  undefined,  // cacheOptions
  payloadManager
);

Payloads larger than 50KB are automatically offloaded.

Manual Payload Offloading

await payloadManager.offloadIfNeeded(
  executionId,
  'initialData',
  largePayload
);

Loading Offloaded Payloads

Payloads are automatically loaded during execution. Access via context:

class MyExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const data = context.inputData;  // Automatically loaded
    return { success: true, data };
  }
}

Job Deduplication

Prevent duplicate workflow executions using custom job IDs:

const executionId = await engine.enqueueWorkflow(
  'my-workflow',
  { orderId: 123 },
  undefined, 0, undefined,
  {
    jobId: `order-processing-123`,  // Custom job ID
    priority: 5
  }
);

If same jobId is used again, BullMQ prevents duplicate execution.

Use Cases

Idempotent Order Processing

const executionId = await engine.enqueueWorkflow(
  'process-order',
  { orderId: 'ORD-12345' },
  undefined, 0, undefined,
  {
    jobId: `order-${orderId}`,  // Unique per order
    priority: 7  // High priority
  }
);

Daily Report

const today = new Date().toISOString().split('T')[0];

const executionId = await engine.enqueueWorkflow(
  'daily-report',
  { date: today },
  undefined, 0, undefined,
  {
    jobId: `report-${today}`,  // One per day
    priority: 3
  }
);

Bulk Operations

Bulk Workflow Enqueue

Execute multiple workflows efficiently:

const workflows = [
  {
    workflowId: 'send-welcome',
    initialData: { userId: 1 },
    options: { priority: 5 }
  },
  {
    workflowId: 'send-welcome',
    initialData: { userId: 2 },
    options: { priority: 5 }
  },
  {
    workflowId: 'send-welcome',
    initialData: { userId: 3 },
    options: { priority: 5 }
  }
];

const executionIds = await engine.enqueueBulkWorkflows(workflows);
console.log(`Started ${executionIds.length} workflows`);

Bulk Pause/Resume

// Pause multiple workflows
await engine.pauseBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);

// Resume multiple workflows
await engine.resumeBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);

Bulk Cancel

// Cancel multiple workflows
await engine.cancelBulkWorkflows(['exec-1', 'exec-2', 'exec-3']);

Workflow Concurrency

Limit Concurrent Executions

Limit how many executions of a workflow can run simultaneously:

const workflow: WorkflowDefinition = {
  id: 'limited-workflow',
  name: 'Limited Concurrent Workflow',
  maxConcurrency: 5,              // Max 5 executions at once
  concurrencyLockTTL: 3600,       // Lock expires after 1 hour
  entryNodeId: 'start',
  nodes: [...]
};

When max concurrency is reached, additional executions are queued until slots free up.

Use Cases

API Rate Limits

const workflow: WorkflowDefinition = {
  id: 'external-api-calls',
  name: 'External API Calls',
  maxConcurrency: 10,  // API allows 10 concurrent requests
  concurrencyLockTTL: 300,  // 5 minute lock
  nodes: [...]
};

Resource Constraints

const workflow: WorkflowDefinition = {
  id: 'heavy-processing',
  name: 'Heavy Processing',
  maxConcurrency: 2,  // Only 2 at a time due to resource limits
  concurrencyLockTTL: 1800,  // 30 minute lock
  nodes: [...]
};

Queue Statistics

Monitor queue health and performance:

const stats = await engine.getQueueStats();

console.log('Waiting jobs:', stats.waiting);
console.log('Active jobs:', stats.active);
console.log('Completed jobs:', stats.completed);
console.log('Failed jobs:', stats.failed);
console.log('Delayed jobs:', stats.delayed);
console.log('Paused jobs:', stats.paused);

Dashboard Example

import { useState, useEffect } from 'react';

function QueueDashboard() {
  const [stats, setStats] = useState({
    waiting: 0,
    active: 0,
    completed: 0,
    failed: 0,
    delayed: 0
  });

  useEffect(() => {
    const interval = setInterval(async () => {
      const queueStats = await engine.getQueueStats();
      setStats(queueStats);
    }, 5000);  // Update every 5 seconds

    return () => clearInterval(interval);
  }, []);

  return (
    <div>
      <h2>Queue Statistics</h2>
      <div>Waiting: {stats.waiting}</div>
      <div>Active: {stats.active}</div>
      <div>Completed: {stats.completed}</div>
      <div>Failed: {stats.failed}</div>
      <div>Delayed: {stats.delayed}</div>
    </div>
  );
}

Replay Executions

Re-run a workflow with same initial data:

const newExecutionId = await engine.replayWorkflow(originalExecutionId);

console.log('Replayed execution:', newExecutionId);

Use Cases

  • Debugging: Re-run failed workflows to test fixes
  • Recovery: Re-process data after fixing bugs
  • Testing: Verify workflow changes with production data

Job Status Check

Check the status of individual jobs:

const status = await engine.getJobStatus(jobId);

if (status.exists) {
  console.log('Job status:', status.status);
  // Possible values: 'waiting', 'active', 'completed', 'failed', 'delayed'
}

Complete Example

import { WorkflowEngine } from 'spane/engine/workflow-engine';
import type { EngineConfig } from 'spane/engine/config';

const engineConfig: EngineConfig = {
  useNativeRateLimiting: true,
  useFlowProducerForSubWorkflows: true,
  workerConcurrency: 10,
  rateLimiter: {
    max: 100,
    duration: 1000
  }
};

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  undefined, undefined, undefined, undefined,
  engineConfig
);

// High priority workflow with concurrency limits
const urgentWorkflow: WorkflowDefinition = {
  id: 'urgent-processing',
  name: 'Urgent Processing',
  priority: 10,
  maxConcurrency: 5,
  concurrencyLockTTL: 600,
  entryNodeId: 'start',
  nodes: [
    {
      id: 'start',
      type: 'http',
      config: {
        url: 'https://api.example.com/urgent',
        retryPolicy: {
          maxAttempts: 5,
          backoff: { type: 'exponential', delay: 1000 }
        }
      },
      inputs: [],
      outputs: ['delay']
    },
    {
      id: 'delay',
      type: 'delay',
      config: {
        duration: 5000  // Wait 5 seconds
      },
      inputs: ['start'],
      outputs: ['complete']
    },
    {
      id: 'complete',
      type: 'email',
      config: { to: 'admin@example.com' },
      inputs: ['delay'],
      outputs: []
    }
  ]
};

await engine.registerWorkflow(urgentWorkflow);

// Execute with high priority
const executionId = await engine.enqueueWorkflow(
  'urgent-processing',
  { urgent: true },
  undefined, 0, undefined,
  { priority: 10 }
);

Next Steps

On this page