My App

Engine Configuration

Configure WorkflowEngine behavior and feature flags

Engine Configuration

The WorkflowEngine supports various configuration options to fine-tune behavior, enable advanced features, and optimize performance.

Basic Configuration

Pass configuration options when creating the engine:

import { WorkflowEngine } from '@manyeya/spane';
import type { EngineConfig } from '@manyeya/spane';

const engineConfig: EngineConfig = {
  useFlowProducerForSubWorkflows: true,
  useNativeRateLimiting: true,
  workerConcurrency: 10,
  rateLimiter: {
    max: 100,
    duration: 1000,  // 100 jobs per second
  },
};

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  engineConfig
);

Configuration Options

OptionTypeDefaultDescription
useFlowProducerForSubWorkflowsbooleanfalseUse BullMQ FlowProducer for sub-workflow execution with native parent-child job dependencies
useNativeRateLimitingbooleanfalseUse BullMQ's native Worker rate limiting instead of custom Redis INCR/EXPIRE implementation
useJobSchedulersbooleantrueUse upsertJobScheduler for schedule management (always enabled in v2.0)
workerConcurrencynumber5Number of jobs to process in parallel per worker
rateLimiterobjectundefinedGlobal rate limiter for node execution worker

Feature Flags

All feature flags default to false (except useJobSchedulers) to maintain backward compatibility. Enable features incrementally as needed:

Minimal Config (All Defaults)

const engine = new WorkflowEngine(registry, stateStore, redis);

Enable Specific Features

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  {
    useFlowProducerForSubWorkflows: true,  // Better sub-workflow handling
    useNativeRateLimiting: true,           // BullMQ native rate limiting
    workerConcurrency: 10,                  // Higher parallelism
  }
);

Sub-Workflows (FlowProducer)

Enable native BullMQ parent-child job dependencies for sub-workflow execution:

const engineConfig: EngineConfig = {
  useFlowProducerForSubWorkflows: true,
};

// Benefits:
// - Automatic dependency management
// - Better reliability
// - Simpler code (no checkpoint/resume pattern)
// - Native result aggregation

const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  engineConfig
);

How it works:

  1. Creates BullMQ flow with aggregator as parent and sub-workflow nodes as children
  2. BullMQ automatically manages job dependencies
  3. Aggregator collects results using getChildrenValues()
  4. Parent workflow notified when sub-workflow completes

Rate Limiting

Per-Node-Type Rate Limiting

Configure rate limits when registering node executors:

registry.register('api-call', new ApiExecutor(), {
  max: 100,      // 100 requests
  duration: 60000  // per minute
});

Global Worker Rate Limiting

Enable BullMQ's native rate limiting for all node execution:

const engineConfig: EngineConfig = {
  useNativeRateLimiting: true,
  rateLimiter: {
    max: 50,       // Maximum jobs to process
    duration: 1000, // Within this time window (ms)
  },
};

// This limits the worker to processing 50 jobs per second
// across all node types

Native vs Custom Rate Limiting:

  • Native (BullMQ): Built into BullMQ, simpler, less Redis operations
  • Custom (Redis INCR/EXPIRE): More control, but higher Redis load

Worker Concurrency

Control how many jobs a worker processes simultaneously:

const engineConfig: EngineConfig = {
  workerConcurrency: 10,  // Process 10 jobs in parallel
};

// Can also be set when starting workers
engine.startWorkers(20);  // Override config value

Guidelines:

  • I/O-bound nodes (HTTP, database): Higher concurrency (10-20)
  • CPU-intensive nodes: Lower concurrency (1-5)
  • Mixed workloads: Moderate concurrency (5-10)

Constants

SPANE defines configurable constants that you can import and use:

import {
  MAX_SUBWORKFLOW_DEPTH,
  DEFAULT_WORKFLOW_CACHE_SIZE,
  DEFAULT_WORKFLOW_CACHE_TTL_MS,
  DEFAULT_REMOVE_ON_COMPLETE_COUNT,
  DEFAULT_REMOVE_ON_FAIL_COUNT,
  DEFAULT_RETRY_DELAY_MS,
  DEFAULT_WORKER_CONCURRENCY,
  DEFAULT_LOCK_TIMEOUT_MS,
  DEFAULT_DELAY_DURATION_MS,
  DEFAULT_RATE_LIMIT_MAX,
  DEFAULT_RATE_LIMIT_DURATION_MS
} from '@manyeya/spane/engine/constants';
ConstantValueDescription
MAX_SUBWORKFLOW_DEPTH10Maximum sub-workflow nesting level
DEFAULT_WORKFLOW_CACHE_SIZE500Max workflows in memory cache
DEFAULT_WORKFLOW_CACHE_TTL_MS3600000Cache TTL (1 hour)
DEFAULT_REMOVE_ON_COMPLETE_COUNT100Completed jobs to keep
DEFAULT_REMOVE_ON_FAIL_COUNT5000Failed jobs to keep
DEFAULT_RETRY_DELAY_MS1000Default retry delay
DEFAULT_WORKER_CONCURRENCY5Default worker concurrency
DEFAULT_LOCK_TIMEOUT_MS30000Distributed lock timeout
DEFAULT_DELAY_DURATION_MS5000Default delay node duration
DEFAULT_RATE_LIMIT_MAX100Default rate limit max
DEFAULT_RATE_LIMIT_DURATION_MS60000Default rate limit window

Accessing Configuration

Retrieve current engine configuration at runtime:

const config = engine.getConfig();

console.log('Worker concurrency:', config.workerConcurrency);
console.log('Using FlowProducer:', config.useFlowProducerForSubWorkflows);
console.log('Native rate limiting:', config.useNativeRateLimiting);

Complete Example

import { Redis } from 'ioredis';
import { WorkflowEngine, NodeRegistry } from '@manyeya/spane';
import { InMemoryExecutionStore } from '@manyeya/spane';
import type { EngineConfig } from '@manyeya/spane';

// Configuration
const engineConfig: EngineConfig = {
  useFlowProducerForSubWorkflows: true,
  useNativeRateLimiting: true,
  workerConcurrency: 10,
  rateLimiter: {
    max: 100,
    duration: 1000,
  },
};

// Initialize components
const redis = new Redis();
const registry = new NodeRegistry();
const stateStore = new InMemoryExecutionStore();

// Create engine
const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  engineConfig
);

// Register workflow and start
await engine.registerWorkflow(workflow);
engine.startWorkers();

console.log('Engine config:', engine.getConfig());

Performance Tuning

High Throughput

const engineConfig: EngineConfig = {
  workerConcurrency: 20,
  useNativeRateLimiting: true,
};

Low Latency

const engineConfig: EngineConfig = {
  workerConcurrency: 5,
  useFlowProducerForSubWorkflows: true,  // Faster sub-workflow execution
};

Resource Constrained

const engineConfig: EngineConfig = {
  workerConcurrency: 2,
};

Next Steps

On this page