My App

API Reference

Complete API reference for SPANE components

API Reference

WorkflowEngine

The main orchestrator that manages workflow lifecycle and execution.

Constructor

constructor(
  registry: NodeRegistry,
  stateStore: IExecutionStateStore,
  redisConnection: Redis,
  metricsCollector?: MetricsCollector,
  circuitBreakerRegistry?: CircuitBreakerRegistry,
  cacheOptions?: WorkflowCacheOptions,
  payloadManager?: PayloadManager,
  engineConfig?: Partial<EngineConfig>
)

Methods

registerWorkflow

Register a workflow definition (with optional DB persistence).

async registerWorkflow(
  workflow: WorkflowDefinition,
  changeNotes?: string
): Promise<void>

Example:

await engine.registerWorkflow(workflow, 'Added email notification step');

enqueueWorkflow

Enqueue a workflow execution with optional priority and delay.

async enqueueWorkflow(
  workflowId: string,
  initialData?: any,
  parentExecutionId?: string,
  depth?: number,
  parentJobId?: string,
  options?: {
    priority?: number;
    delay?: number;
    jobId?: string;
  }
): Promise<string>

Returns: Execution ID

Example:

const executionId = await engine.enqueueWorkflow(
  'my-workflow',
  { userId: 123 },
  undefined,
  0,
  undefined,
  { priority: 8, delay: 5000 }
);

enqueueNode

Enqueue a single node execution.

async enqueueNode(
  executionId: string,
  workflowId: string,
  nodeId: string,
  inputData?: any,
  parentJobId?: string,
  options?: {
    priority?: number;
    delay?: number;
    jobId?: string;
  }
): Promise<string>

Returns: Job ID


triggerWebhook

Trigger workflows via webhook path.

async triggerWebhook(
  path: string,
  method: string,
  data: any
): Promise<string[]>

Returns: Array of execution IDs


getWorkflow

Get workflow definition (lazy loads from database if not cached).

async getWorkflow(workflowId: string): Promise<WorkflowDefinition | undefined>

getAllWorkflowsFromDatabase

Get all workflows from database with pagination.

async getAllWorkflowsFromDatabase(
  activeOnly?: boolean,
  limit?: number,
  offset?: number
): Promise<WorkflowDefinition[]>

startWorkers

Start worker processes.

startWorkers(concurrency?: number): void

cancelWorkflow

Cancel a running workflow.

async cancelWorkflow(executionId: string): Promise<void>

pauseWorkflow

Pause a running workflow.

async pauseWorkflow(executionId: string): Promise<void>

resumeWorkflow

Resume a paused workflow.

async resumeWorkflow(executionId: string): Promise<void>

replayWorkflow

Replay a past execution with the same initial data.

async replayWorkflow(executionId: string): Promise<string>

Returns: New execution ID


scheduleWorkflow

Schedule a workflow to execute at a specific time.

async scheduleWorkflow(
  workflowId: string,
  initialData: any,
  executeAt: Date
): Promise<string>

Returns: Execution ID


getDLQItems

Get items from Dead Letter Queue.

async getDLQItems(
  start?: number,
  end?: number
): Promise<DLQItem[]>

retryDLQItem

Retry a specific DLQ item.

async retryDLQItem(dlqJobId: string): Promise<boolean>

getQueueStats

Get queue statistics.

async getQueueStats(): Promise<{
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  paused: number;
}>

getEventStream

Get the EventStreamManager for real-time event streaming.

getEventStream(): EventStreamManager

getConfig

Get the current engine configuration.

getConfig(): EngineConfig

close

Graceful shutdown of the engine.

async close(): Promise<void>

NodeRegistry

Stores and manages node executors.

Constructor

constructor()

Methods

register

Register a node executor with optional circuit breaker and rate limiting options.

register(
  type: string,
  executor: INodeExecutor,
  options?: {
    failureThreshold?: number;
    successThreshold?: number;
    timeout?: number;
    max?: number;
    duration?: number;
  }
): void

Example:

registry.register('http', new HttpExecutor(), {
  failureThreshold: 5,
  successThreshold: 2,
  timeout: 60000
});

registry.register('api-call', new ApiExecutor(), {
  max: 100,
  duration: 60000
});

get

Get a node executor by type.

get(type: string): INodeExecutor | undefined

IExecutionStateStore

Interface for execution state storage implementations.

Methods

createExecution

Create a new execution record.

createExecution(
  workflowId: string,
  parentExecutionId?: string,
  depth?: number,
  initialData?: any
): Promise<string>

Returns: Execution ID


getExecution

Get execution state.

getExecution(executionId: string): Promise<ExecutionState | null>

getNodeResults

Get results for specific nodes.

getNodeResults(
  executionId: string,
  nodeIds: string[]
): Promise<Record<string, ExecutionResult>>

getPendingNodeCount

Get count of pending nodes for an execution.

getPendingNodeCount(
  executionId: string,
  totalNodes: number
): Promise<number>

updateNodeResult

Update result for a specific node.

updateNodeResult(
  executionId: string,
  nodeId: string,
  result: ExecutionResult
): Promise<void>

cacheNodeResult

Cache a node result (for faster lookup).

cacheNodeResult(
  executionId: string,
  nodeId: string,
  result: ExecutionResult
): Promise<void>

setExecutionStatus

Update execution status.

setExecutionStatus(
  executionId: string,
  status: ExecutionState['status']
): Promise<void>

saveWorkflow

Save workflow definition (with versioning).

saveWorkflow(
  workflow: WorkflowDefinition,
  changeNotes?: string,
  createdBy?: string
): Promise<number>

Returns: Version ID


getWorkflow

Get workflow definition (optionally specific version).

getWorkflow(
  workflowId: string,
  version?: number
): Promise<WorkflowDefinition | null>

listWorkflows

List workflows with pagination.

listWorkflows(
  activeOnly?: boolean,
  limit?: number,
  offset?: number
): Promise<WorkflowDefinition[]>

deactivateWorkflow

Deactivate a workflow.

deactivateWorkflow(workflowId: string): Promise<void>

listExecutions

List executions with optional filtering.

listExecutions?(
  workflowId?: string,
  limit?: number,
  offset?: number
): Promise<Array<{
  executionId: string;
  workflowId: string;
  status: string;
  startedAt: Date;
  completedAt?: Date;
}>>

getChildExecutions

Get child executions (for sub-workflows).

getChildExecutions?(
  executionId: string
): Promise<ExecutionState[]>

getParentExecution

Get parent execution (for sub-workflows).

getParentExecution?(
  executionId: string
): Promise<ExecutionState | null>

EventStreamManager

Provides real-time event streaming via Redis Pub/Sub.

Methods

subscribe

Subscribe to all events.

async subscribe(
  eventPattern: string,
  callback: (event: any) => void
): Promise<EventSubscription>

Returns: Subscription object


subscribeToExecution

Subscribe to events for a specific execution.

async subscribeToExecution(
  executionId: string,
  callback: (event: any) => void
): Promise<EventSubscription>

Type Definitions

WorkflowDefinition

interface WorkflowDefinition {
  id: string;
  name: string;
  entryNodeId: string;
  nodes: NodeDefinition[];
  triggers?: WorkflowTrigger[];
  maxConcurrency?: number;
  concurrencyLockTTL?: number;
  priority?: number;
  delay?: number;
  jobId?: string;
}

NodeDefinition

interface NodeDefinition {
  id: string;
  type: string;
  config: Record<string, any>;
  inputs: string[];
  outputs: string[];
}

ExecutionResult

interface ExecutionResult {
  success: boolean;
  data?: any;
  error?: string;
  nextNodes?: string[];
  skipped?: boolean;
}

ExecutionContext

interface ExecutionContext {
  workflowId: string;
  executionId: string;
  nodeId: string;
  inputData: any;
  nodeConfig?: Record<string, any>;
  previousResults: Record<string, ExecutionResult>;
  allNodeResults?: Record<string, ExecutionResult>;
  parentExecutionId?: string;
  depth: number;
  rateLimit?: (duration: number) => Promise<Error>;
}

ExecutionState

interface ExecutionState {
  executionId: string;
  workflowId: string;
  status: 'running' | 'completed' | 'failed' | 'cancelled' | 'paused';
  nodeResults: Record<string, ExecutionResult>;
  startedAt: Date;
  completedAt?: Date;
  parentExecutionId?: string;
  depth: number;
  initialData?: any;
  metadata?: {
    parentNodeId?: string;
    parentWorkflowId?: string;
    [key: string]: any;
  };
}

EngineConfig

interface EngineConfig {
  useFlowProducerForSubWorkflows?: boolean;
  useNativeRateLimiting?: boolean;
  useJobSchedulers?: boolean;
  useWorkerThreads?: boolean;
  useSimplifiedEventStream?: boolean;
  workerConcurrency?: number;
  rateLimiter?: {
    max: number;
    duration: number;
  };
  processorFile?: string;
}

DLQItem

interface DLQItem {
  jobId: string;
  executionId: string;
  workflowId: string;
  nodeId: string;
  nodeType: string;
  inputData: any;
  error: string;
  stacktrace: string;
  attemptsMade: number;
  failedAt: Date;
}

Next Steps

On this page