My App

Workflow Definitions

Define and structure workflow definitions in SPANE

Workflow Definitions

Workflows in SPANE are defined as Directed Acyclic Graphs (DAGs) where nodes represent processing steps and edges represent data dependencies.

Basic Workflow Structure

import type { WorkflowDefinition } from 'spane';

const workflow: WorkflowDefinition = {
  id: 'my-workflow',
  name: 'My Workflow',
  entryNodeId: 'start',
  nodes: [
    {
      id: 'start',
      type: 'transform',
      config: {},
      inputs: [],
      outputs: ['process']
    },
    {
      id: 'process',
      type: 'http',
      config: { url: 'https://api.example.com' },
      inputs: ['start'],
      outputs: ['end']
    },
    {
      id: 'end',
      type: 'email',
      config: { to: 'user@example.com' },
      inputs: ['process'],
      outputs: []
    }
  ]
};

Workflow Properties

Core Properties

PropertyTypeRequiredDescription
idstringYesUnique identifier for the workflow
namestringYesHuman-readable name
entryNodeIdstringYesID of the starting node
nodesNodeDefinition[]YesArray of node definitions

Optional Properties

PropertyTypeDefaultDescription
triggersWorkflowTrigger[]undefinedWebhook or schedule triggers
maxConcurrencynumberundefinedMax concurrent workflow executions
concurrencyLockTTLnumberundefinedTTL for concurrency lock (seconds)
prioritynumber5Job priority (1-10, higher = more important)
delaynumber0Delay before execution (milliseconds)
jobIdstringundefinedCustom job ID for deduplication

Node Definition

Each node in the workflow is defined with these properties:

interface NodeDefinition {
  id: string;           // Unique node ID within the workflow
  type: string;         // Node executor type (registered in NodeRegistry)
  config: Record<string, any>;  // Node-specific configuration
  inputs: string[];     // IDs of parent nodes
  outputs: string[];    // IDs of child nodes
}

Node Example

{
  id: 'fetch-data',
  type: 'http',
  config: {
    url: 'https://api.example.com/users',
    method: 'GET',
    headers: {
      'Authorization': 'Bearer token123'
    }
  },
  inputs: ['authenticate'],
  outputs: ['transform', 'validate']
}

Workflow Patterns

Linear Workflow

Simple sequential execution:

const linearWorkflow: WorkflowDefinition = {
  id: 'linear-workflow',
  name: 'Linear Workflow',
  entryNodeId: 'step1',
  nodes: [
    {
      id: 'step1',
      type: 'transform',
      config: {},
      inputs: [],
      outputs: ['step2']
    },
    {
      id: 'step2',
      type: 'transform',
      config: {},
      inputs: ['step1'],
      outputs: ['step3']
    },
    {
      id: 'step3',
      type: 'email',
      config: { to: 'admin@example.com' },
      inputs: ['step2'],
      outputs: []
    }
  ]
};

Parallel Workflow

Execute multiple branches concurrently:

const parallelWorkflow: WorkflowDefinition = {
  id: 'parallel-workflow',
  name: 'Parallel Workflow',
  entryNodeId: 'split',
  nodes: [
    {
      id: 'split',
      type: 'transform',
      config: {},
      inputs: [],
      outputs: ['branch1', 'branch2', 'branch3']
    },
    {
      id: 'branch1',
      type: 'http',
      config: { url: 'https://api1.example.com' },
      inputs: ['split'],
      outputs: ['merge']
    },
    {
      id: 'branch2',
      type: 'http',
      config: { url: 'https://api2.example.com' },
      inputs: ['split'],
      outputs: ['merge']
    },
    {
      id: 'branch3',
      type: 'http',
      config: { url: 'https://api3.example.com' },
      inputs: ['split'],
      outputs: ['merge']
    },
    {
      id: 'merge',
      type: 'transform',
      config: {},
      inputs: ['branch1', 'branch2', 'branch3'],
      outputs: []
    }
  ]
};

Conditional Branching

Control flow based on execution results:

const conditionalWorkflow: WorkflowDefinition = {
  id: 'conditional-workflow',
  name: 'Conditional Workflow',
  entryNodeId: 'evaluate',
  nodes: [
    {
      id: 'evaluate',
      type: 'router',
      config: {},
      inputs: [],
      outputs: ['process-a', 'process-b']  // Will use nextNodes in result
    },
    {
      id: 'process-a',
      type: 'http',
      config: { url: 'https://api-a.example.com' },
      inputs: ['evaluate'],
      outputs: ['end']
    },
    {
      id: 'process-b',
      type: 'http',
      config: { url: 'https://api-b.example.com' },
      inputs: ['evaluate'],
      outputs: ['end']
    },
    {
      id: 'end',
      type: 'email',
      config: {},
      inputs: ['process-a', 'process-b'],
      outputs: []
    }
  ]
};

// In your router executor:
class RouterExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { condition } = context.inputData;

    if (condition === 'a') {
      return {
        success: true,
        data: { route: 'a' },
        nextNodes: ['process-a']  // Only execute branch A
      };
    } else {
      return {
        success: true,
        data: { route: 'b' },
        nextNodes: ['process-b']  // Only execute branch B
      };
    }
  }
}

Fan-out/Fan-in

One-to-many and many-to-one patterns:

const fanOutFanInWorkflow: WorkflowDefinition = {
  id: 'fanout-fanin',
  name: 'Fan-out/Fan-in Pattern',
  entryNodeId: 'fetch-list',
  nodes: [
    {
      id: 'fetch-list',
      type: 'http',
      config: { url: 'https://api.example.com/items' },
      inputs: [],
      outputs: ['process-items']
    },
    {
      id: 'process-items',
      type: 'parallel-processor',
      config: { batchSize: 10 },
      inputs: ['fetch-list'],
      outputs: ['aggregate']
    },
    {
      id: 'aggregate',
      type: 'transform',
      config: {},
      inputs: ['process-items'],
      outputs: []
    }
  ]
};

Built-in Node Types

HTTP Node

Make HTTP requests:

{
  id: 'http-call',
  type: 'http',
  config: {
    url: 'https://api.example.com/data',
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer token'
    }
  },
  inputs: [],
  outputs: ['process-response']
}

Transform Node

Transform data using JavaScript functions or JSONata expressions:

{
  id: 'transform-data',
  type: 'transform',
  config: {
    // Using JSONata expression
    expression: '$$.payload ~> | $ | {"result": $} |',
    // Or using inline function (registered separately)
    transformFn: 'dataTransform'
  },
  inputs: ['fetch'],
  outputs: ['save']
}

Delay Node

Pause workflow execution for a specified duration:

{
  id: 'wait-5-minutes',
  type: 'delay',
  config: {
    duration: 300000,          // 5 minutes in milliseconds (highest priority)
    // OR
    durationSeconds: 300,       // 300 seconds
    // OR
    durationMinutes: 5         // 5 minutes
  },
  inputs: ['start'],
  outputs: ['continue']
}

Sub-workflow Node

Call another workflow as a reusable component:

{
  id: 'send-email',
  type: 'sub-workflow',
  config: {
    workflowId: 'email-sender-workflow',
    inputMapping: {
      'recipient': 'userEmail',
      'subject': 'emailSubject'
    },
    outputMapping: {
      'emailId': 'messageId'
    },
    continueOnFail: false
  },
  inputs: ['prepare-email'],
  outputs: ['log-result']
}

Data Passing

Entry Nodes

Entry nodes receive the initial workflow data:

const executionId = await engine.enqueueWorkflow('my-workflow', {
  userId: 123,
  action: 'signup'
});

// Entry node context.inputData contains:
// { userId: 123, action: 'signup' }

Single Parent

Nodes with one parent receive the parent's output directly:

// Parent node output: { result: 'data' }
// Child node context.inputData: { result: 'data' }

Multiple Parents

Nodes with multiple parents receive a merged object:

// Parent A output: { dataA: 1 }
// Parent B output: { dataB: 2 }
// Child node context.inputData: {
//   'parent-a': { dataA: 1 },
//   'parent-b': { dataB: 2 }
// }

Accessing All Results

Use previousResults or allNodeResults to access any ancestor node's output:

class MyExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    // Access all direct parent results
    const parentResults = context.previousResults;

    // Access any ancestor result
    const allResults = context.allNodeResults;
    const initialData = allResults['start-node']?.data;

    return { success: true, data: {} };
  }
}

Registering Workflows

await engine.registerWorkflow(workflow, changeNotes);

// With version tracking
await engine.registerWorkflow(workflow, 'Added new email notification step');

Next Steps

On this page