My App

Node Executors

Create custom node executors to process workflow steps

Node Executors

Node executors are the core building blocks of SPANE workflows. Each executor implements the INodeExecutor interface and defines how a specific node type processes data.

Creating a Node Executor

Basic Executor

All node executors must implement the INodeExecutor interface:

import type { INodeExecutor, ExecutionContext, ExecutionResult } from 'spane';

export class MyExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    try {
      // 1. Extract configuration
      const { option1, option2 } = context.nodeConfig || {};

      // 2. Access input data
      const inputData = context.inputData;

      // 3. Process data
      const result = await this.processData(inputData, option1, option2);

      // 4. Return result
      return {
        success: true,
        data: result
      };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Unknown error'
      };
    }
  }

  private async processData(data: any, opt1?: string, opt2?: string) {
    // Your processing logic here
    return data;
  }
}

Execution Context

The ExecutionContext provides all the information your executor needs:

interface ExecutionContext {
  workflowId: string;           // Current workflow ID
  executionId: string;          // Unique execution ID
  nodeId: string;              // Current node ID
  inputData: any;               // Input data for this node
  nodeConfig?: Record<string, any>;  // Node configuration
  previousResults: Record<string, ExecutionResult>;  // Parent node results
  allNodeResults?: Record<string, ExecutionResult>;  // All ancestor results
  parentExecutionId?: string;   // Parent execution ID (for sub-workflows)
  depth: number;               // Nesting depth
  rateLimit?: (duration: number) => Promise<Error>;  // Rate limiter
}

Accessing Input Data

class DataProcessor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    // Get input data (varies by node position)
    const data = context.inputData;

    // Entry nodes: initial workflow data
    // Single parent: parent's output
    // Multiple parents: merged object { 'parent-id': output }

    // Get node configuration
    const { timeout, retries } = context.nodeConfig || {};

    return { success: true, data: { processed: true } };
  }
}

Accessing Previous Results

class Aggregator implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    // Access direct parent results
    const parentAResult = context.previousResults['parent-a'];
    const parentBResult = context.previousResults['parent-b'];

    // Access any ancestor result
    if (context.allNodeResults) {
      const startNodeResult = context.allNodeResults['start-node'];
    }

    // Aggregate results
    const aggregated = {
      fromA: parentAResult?.data,
      fromB: parentBResult?.data
    };

    return { success: true, data: aggregated };
  }
}

Using Rate Limiting

class ApiCaller implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { url } = context.nodeConfig || {};

    const response = await fetch(url);

    // Handle rate limiting
    if (response.status === 429) {
      const retryAfter = parseInt(
        response.headers.get('retry-after') || '60',
        10
      ) * 1000;

      // Trigger rate limiting and throw
      if (context.rateLimit) {
        const error = await context.rateLimit(retryAfter);
        throw error;
      }
    }

    const data = await response.json();
    return { success: true, data };
  }
}

Common Executor Patterns

HTTP Executor

export class HttpExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const {
      url,
      method = 'GET',
      headers = {},
      body,
      timeout = 30000
    } = context.nodeConfig || {};

    if (!url) {
      return {
        success: false,
        error: 'URL is required'
      };
    }

    try {
      const controller = new AbortController();
      const timeoutId = setTimeout(() => controller.abort(), timeout);

      const response = await fetch(url, {
        method,
        headers: {
          'Content-Type': 'application/json',
          ...headers
        },
        body: method !== 'GET' && body ? JSON.stringify(body) : undefined,
        signal: controller.signal
      });

      clearTimeout(timeoutId);

      if (!response.ok) {
        return {
          success: false,
          error: `HTTP ${response.status}: ${response.statusText}`
        };
      }

      const data = await response.json();
      return { success: true, data };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Unknown error'
      };
    }
  }
}

Database Executor

export class DatabaseExecutor implements INodeExecutor {
  constructor(private pool: any) {}

  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { query, params } = context.nodeConfig || {};

    if (!query) {
      return {
        success: false,
        error: 'Query is required'
      };
    }

    try {
      const result = await this.pool.query(query, params);
      return { success: true, data: result.rows };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Database error'
      };
    }
  }
}

Email Executor

import { Resend } from 'resend';

export class EmailExecutor implements INodeExecutor {
  constructor(private emailService: Resend) {}

  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const {
      to,
      subject,
      from,
      template,
      html,
      text
    } = context.nodeConfig || {};

    if (!to || !subject || !from) {
      return {
        success: false,
        error: 'to, subject, and from are required'
      };
    }

    try {
      const result = await this.emailService.emails.send({
        from,
        to,
        subject,
        html: html || template,
        text
      });

      return { success: true, data: { messageId: result.data.id } };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Email error'
      };
    }
  }
}

Transform Executor (JSONata)

import * as jsonata from 'jsonata';

export class TransformExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { expression, transformFn } = context.nodeConfig || {};

    if (expression) {
      // Using JSONata expression
      const compiled = jsonata(expression);
      const result = await compiled.evaluate(context.inputData);
      return { success: true, data: result };
    }

    if (transformFn) {
      // Using custom transform function (registered separately)
      const transforms: Record<string, (data: any) => any> = {
        dataTransform: (data: any) => ({
          ...data,
          transformed: true,
          timestamp: new Date().toISOString()
        })
      };

      const transform = transforms[transformFn];
      if (!transform) {
        return {
          success: false,
          error: `Transform function '${transformFn}' not found`
        };
      }

      const result = transform(context.inputData);
      return { success: true, data: result };
    }

    // Default: just pass through data
    return { success: true, data: context.inputData };
  }
}

Router Executor (Conditional Branching)

export class RouterExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const {
      condition,
      cases = {}
    } = context.nodeConfig || {};

    const value = context.inputData[condition] ?? context.inputData;

    // Find matching case
    for (const [caseValue, targetNode] of Object.entries(cases)) {
      if (value === caseValue) {
        return {
          success: true,
          data: { routedTo: targetNode },
          nextNodes: [targetNode as string]  // Only execute this branch
        };
      }
    }

    // Default case
    if (cases.default) {
      return {
        success: true,
        data: { routedTo: cases.default },
        nextNodes: [cases.default as string]
      };
    }

    return {
      success: false,
      error: `No matching case for value: ${value}`
    };
  }
}

Delay Executor

export class DelayExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { duration, durationSeconds, durationMinutes } = context.nodeConfig || {};

    let delayMs = 0;

    // Priority: duration > durationSeconds > durationMinutes
    if (duration) {
      delayMs = duration;
    } else if (durationSeconds) {
      delayMs = durationSeconds * 1000;
    } else if (durationMinutes) {
      delayMs = durationMinutes * 60 * 1000;
    }

    if (delayMs <= 0) {
      return {
        success: false,
        error: 'Invalid delay duration'
      };
    }

    // Pass data through with delay
    return {
      success: true,
      data: context.inputData
    };
  }
}

Registering Executors

Basic Registration

import { NodeRegistry } from 'spane';

const registry = new NodeRegistry();
registry.register('http', new HttpExecutor());
registry.register('email', new EmailExecutor(emailService));
registry.register('transform', new TransformExecutor());

With Circuit Breaker

import { CircuitBreakerRegistry } from 'spane/utils/circuit-breaker';

const circuitBreakerRegistry = new CircuitBreakerRegistry();

// Register executor with circuit breaker protection
registry.register('http', new HttpExecutor(), {
  failureThreshold: 5,
  successThreshold: 2,
  timeout: 60000
});

// Pass to engine
const engine = new WorkflowEngine(
  registry,
  stateStore,
  redis,
  undefined,
  circuitBreakerRegistry
);

With Rate Limiting

// Per-node-type rate limiting
registry.register('api-call', new ApiCaller(), {
  max: 100,      // 100 requests
  duration: 60000  // per minute
});

Error Handling

Retry Policies

Define retry behavior in node configuration:

{
  id: 'unstable-api',
  type: 'http',
  config: {
    url: 'https://unstable-api.example.com',
    retryPolicy: {
      maxAttempts: 5,
      backoff: {
        type: 'exponential',
        delay: 1000
      },
      continueOnFail: false
    }
  },
  inputs: ['start'],
  outputs: ['process']
}

Continue on Fail

Allow workflow to continue even if node fails:

{
  id: 'optional-enrichment',
  type: 'http',
  config: {
    url: 'https://enrichment.example.com',
    retryPolicy: {
      maxAttempts: 3,
      continueOnFail: true  // Don't stop workflow on failure
    }
  },
  inputs: ['data'],
  outputs: ['save']
}

Best Practices

  1. Keep executors focused: Single responsibility principle
  2. Handle all errors: Always return success/error state
  3. Use timeouts: Prevent indefinite hanging
  4. Log important events: Use context for correlation
  5. Validate inputs: Check required configuration
  6. Make idempotent: Safe to retry multiple times
  7. Use circuit breakers: Protect external dependencies

Next Steps

On this page