API Reference
Complete API reference for SPANE components
API Reference
WorkflowEngine
The main orchestrator that manages workflow lifecycle and execution.
Constructor
constructor(
registry: NodeRegistry,
stateStore: IExecutionStateStore,
redisConnection: Redis,
metricsCollector?: MetricsCollector,
circuitBreakerRegistry?: CircuitBreakerRegistry,
cacheOptions?: WorkflowCacheOptions,
payloadManager?: PayloadManager,
engineConfig?: Partial<EngineConfig>
)Methods
registerWorkflow
Register a workflow definition (with optional DB persistence).
async registerWorkflow(
workflow: WorkflowDefinition,
changeNotes?: string
): Promise<void>Example:
await engine.registerWorkflow(workflow, 'Added email notification step');enqueueWorkflow
Enqueue a workflow execution with optional priority and delay.
async enqueueWorkflow(
workflowId: string,
initialData?: any,
parentExecutionId?: string,
depth?: number,
parentJobId?: string,
options?: {
priority?: number;
delay?: number;
jobId?: string;
}
): Promise<string>Returns: Execution ID
Example:
const executionId = await engine.enqueueWorkflow(
'my-workflow',
{ userId: 123 },
undefined,
0,
undefined,
{ priority: 8, delay: 5000 }
);enqueueNode
Enqueue a single node execution.
async enqueueNode(
executionId: string,
workflowId: string,
nodeId: string,
inputData?: any,
parentJobId?: string,
options?: {
priority?: number;
delay?: number;
jobId?: string;
}
): Promise<string>Returns: Job ID
triggerWebhook
Trigger workflows via webhook path.
async triggerWebhook(
path: string,
method: string,
data: any
): Promise<string[]>Returns: Array of execution IDs
getWorkflow
Get workflow definition (lazy loads from database if not cached).
async getWorkflow(workflowId: string): Promise<WorkflowDefinition | undefined>getAllWorkflowsFromDatabase
Get all workflows from database with pagination.
async getAllWorkflowsFromDatabase(
activeOnly?: boolean,
limit?: number,
offset?: number
): Promise<WorkflowDefinition[]>startWorkers
Start worker processes.
startWorkers(concurrency?: number): voidcancelWorkflow
Cancel a running workflow.
async cancelWorkflow(executionId: string): Promise<void>pauseWorkflow
Pause a running workflow.
async pauseWorkflow(executionId: string): Promise<void>resumeWorkflow
Resume a paused workflow.
async resumeWorkflow(executionId: string): Promise<void>replayWorkflow
Replay a past execution with the same initial data.
async replayWorkflow(executionId: string): Promise<string>Returns: New execution ID
scheduleWorkflow
Schedule a workflow to execute at a specific time.
async scheduleWorkflow(
workflowId: string,
initialData: any,
executeAt: Date
): Promise<string>Returns: Execution ID
getDLQItems
Get items from Dead Letter Queue.
async getDLQItems(
start?: number,
end?: number
): Promise<DLQItem[]>retryDLQItem
Retry a specific DLQ item.
async retryDLQItem(dlqJobId: string): Promise<boolean>getQueueStats
Get queue statistics.
async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
}>getEventStream
Get the EventStreamManager for real-time event streaming.
getEventStream(): EventStreamManagergetConfig
Get the current engine configuration.
getConfig(): EngineConfigclose
Graceful shutdown of the engine.
async close(): Promise<void>NodeRegistry
Stores and manages node executors.
Constructor
constructor()Methods
register
Register a node executor with optional circuit breaker and rate limiting options.
register(
type: string,
executor: INodeExecutor,
options?: {
failureThreshold?: number;
successThreshold?: number;
timeout?: number;
max?: number;
duration?: number;
}
): voidExample:
registry.register('http', new HttpExecutor(), {
failureThreshold: 5,
successThreshold: 2,
timeout: 60000
});
registry.register('api-call', new ApiExecutor(), {
max: 100,
duration: 60000
});get
Get a node executor by type.
get(type: string): INodeExecutor | undefinedIExecutionStateStore
Interface for execution state storage implementations.
Methods
createExecution
Create a new execution record.
createExecution(
workflowId: string,
parentExecutionId?: string,
depth?: number,
initialData?: any
): Promise<string>Returns: Execution ID
getExecution
Get execution state.
getExecution(executionId: string): Promise<ExecutionState | null>getNodeResults
Get results for specific nodes.
getNodeResults(
executionId: string,
nodeIds: string[]
): Promise<Record<string, ExecutionResult>>getPendingNodeCount
Get count of pending nodes for an execution.
getPendingNodeCount(
executionId: string,
totalNodes: number
): Promise<number>updateNodeResult
Update result for a specific node.
updateNodeResult(
executionId: string,
nodeId: string,
result: ExecutionResult
): Promise<void>cacheNodeResult
Cache a node result (for faster lookup).
cacheNodeResult(
executionId: string,
nodeId: string,
result: ExecutionResult
): Promise<void>setExecutionStatus
Update execution status.
setExecutionStatus(
executionId: string,
status: ExecutionState['status']
): Promise<void>saveWorkflow
Save workflow definition (with versioning).
saveWorkflow(
workflow: WorkflowDefinition,
changeNotes?: string,
createdBy?: string
): Promise<number>Returns: Version ID
getWorkflow
Get workflow definition (optionally specific version).
getWorkflow(
workflowId: string,
version?: number
): Promise<WorkflowDefinition | null>listWorkflows
List workflows with pagination.
listWorkflows(
activeOnly?: boolean,
limit?: number,
offset?: number
): Promise<WorkflowDefinition[]>deactivateWorkflow
Deactivate a workflow.
deactivateWorkflow(workflowId: string): Promise<void>listExecutions
List executions with optional filtering.
listExecutions?(
workflowId?: string,
limit?: number,
offset?: number
): Promise<Array<{
executionId: string;
workflowId: string;
status: string;
startedAt: Date;
completedAt?: Date;
}>>getChildExecutions
Get child executions (for sub-workflows).
getChildExecutions?(
executionId: string
): Promise<ExecutionState[]>getParentExecution
Get parent execution (for sub-workflows).
getParentExecution?(
executionId: string
): Promise<ExecutionState | null>EventStreamManager
Provides real-time event streaming via Redis Pub/Sub.
Methods
subscribe
Subscribe to all events.
async subscribe(
eventPattern: string,
callback: (event: any) => void
): Promise<EventSubscription>Returns: Subscription object
subscribeToExecution
Subscribe to events for a specific execution.
async subscribeToExecution(
executionId: string,
callback: (event: any) => void
): Promise<EventSubscription>Type Definitions
WorkflowDefinition
interface WorkflowDefinition {
id: string;
name: string;
entryNodeId: string;
nodes: NodeDefinition[];
triggers?: WorkflowTrigger[];
maxConcurrency?: number;
concurrencyLockTTL?: number;
priority?: number;
delay?: number;
jobId?: string;
}NodeDefinition
interface NodeDefinition {
id: string;
type: string;
config: Record<string, any>;
inputs: string[];
outputs: string[];
}ExecutionResult
interface ExecutionResult {
success: boolean;
data?: any;
error?: string;
nextNodes?: string[];
skipped?: boolean;
}ExecutionContext
interface ExecutionContext {
workflowId: string;
executionId: string;
nodeId: string;
inputData: any;
nodeConfig?: Record<string, any>;
previousResults: Record<string, ExecutionResult>;
allNodeResults?: Record<string, ExecutionResult>;
parentExecutionId?: string;
depth: number;
rateLimit?: (duration: number) => Promise<Error>;
}ExecutionState
interface ExecutionState {
executionId: string;
workflowId: string;
status: 'running' | 'completed' | 'failed' | 'cancelled' | 'paused';
nodeResults: Record<string, ExecutionResult>;
startedAt: Date;
completedAt?: Date;
parentExecutionId?: string;
depth: number;
initialData?: any;
metadata?: {
parentNodeId?: string;
parentWorkflowId?: string;
[key: string]: any;
};
}EngineConfig
interface EngineConfig {
useFlowProducerForSubWorkflows?: boolean;
useNativeRateLimiting?: boolean;
useJobSchedulers?: boolean;
useWorkerThreads?: boolean;
useSimplifiedEventStream?: boolean;
workerConcurrency?: number;
rateLimiter?: {
max: number;
duration: number;
};
processorFile?: string;
}DLQItem
interface DLQItem {
jobId: string;
executionId: string;
workflowId: string;
nodeId: string;
nodeType: string;
inputData: any;
error: string;
stacktrace: string;
attemptsMade: number;
failedAt: Date;
}