My App

Examples & Recipes

Practical examples and common workflow patterns

Examples & Recipes

Collection of practical examples and common workflow patterns for SPANE.

Basic Examples

Simple HTTP Workflow

Fetch data from an API and save to database:

const simpleWorkflow: WorkflowDefinition = {
  id: 'fetch-and-save',
  name: 'Fetch and Save',
  entryNodeId: 'fetch',
  nodes: [
    {
      id: 'fetch',
      type: 'http',
      config: {
        url: 'https://api.example.com/data',
        retryPolicy: {
          maxAttempts: 3,
          backoff: { type: 'exponential', delay: 1000 }
        }
      },
      inputs: [],
      outputs: ['save']
    },
    {
      id: 'save',
      type: 'database',
      config: {
        query: 'INSERT INTO data (json) VALUES ($1)'
      },
      inputs: ['fetch'],
      outputs: []
    }
  ]
};

Email Notification

Send welcome email after user signup:

const welcomeEmailWorkflow: WorkflowDefinition = {
  id: 'welcome-email',
  name: 'Welcome Email',
  entryNodeId: 'send-email',
  triggers: [
    {
      type: 'webhook',
      config: {
        path: 'user-signup',
        method: 'POST'
      }
    }
  ],
  nodes: [
    {
      id: 'send-email',
      type: 'email',
      config: {
        from: 'welcome@example.com',
        template: 'welcome',
        subject: 'Welcome to Our Platform!'
      },
      inputs: [],
      outputs: ['log']
    },
    {
      id: 'log',
      type: 'database',
      config: {
        query: 'INSERT INTO logs (message) VALUES ($1)'
      },
      inputs: ['send-email'],
      outputs: []
    }
  ]
};

// API endpoint
app.post('/api/webhooks/user-signup', async ({ body }) => {
  const executionIds = await engine.triggerWebhook('user-signup', 'POST', body);
  return { executionIds };
});

Common Patterns

Parallel API Calls

Call multiple APIs in parallel and aggregate results:

const parallelApiWorkflow: WorkflowDefinition = {
  id: 'parallel-apis',
  name: 'Parallel API Calls',
  entryNodeId: 'split',
  nodes: [
    {
      id: 'split',
      type: 'transform',
      config: {},
      inputs: [],
      outputs: ['api-1', 'api-2', 'api-3']
    },
    {
      id: 'api-1',
      type: 'http',
      config: { url: 'https://api1.example.com' },
      inputs: ['split'],
      outputs: ['aggregate']
    },
    {
      id: 'api-2',
      type: 'http',
      config: { url: 'https://api2.example.com' },
      inputs: ['split'],
      outputs: ['aggregate']
    },
    {
      id: 'api-3',
      type: 'http',
      config: { url: 'https://api3.example.com' },
      inputs: ['split'],
      outputs: ['aggregate']
    },
    {
      id: 'aggregate',
      type: 'transform',
      config: {
        expression: '$$.($ ~> | $ | { "api_" & $position(): $} |)'
      },
      inputs: ['api-1', 'api-2', 'api-3'],
      outputs: []
    }
  ]
};

Data Enrichment Pipeline

Fetch data and enrich from multiple sources:

const enrichmentWorkflow: WorkflowDefinition = {
  id: 'data-enrichment',
  name: 'Data Enrichment Pipeline',
  entryNodeId: 'fetch-user',
  nodes: [
    {
      id: 'fetch-user',
      type: 'database',
      config: {
        query: 'SELECT * FROM users WHERE id = $1'
      },
      inputs: [],
      outputs: ['enrich-profile', 'enrich-orders', 'enrich-preferences']
    },
    {
      id: 'enrich-profile',
      type: 'http',
      config: {
        url: 'https://profile-api.example.com/users/$.userId',
        retryPolicy: {
          maxAttempts: 3,
          continueOnFail: true  // Optional enrichment
        }
      },
      inputs: ['fetch-user'],
      outputs: ['merge']
    },
    {
      id: 'enrich-orders',
      type: 'http',
      config: {
        url: 'https://orders-api.example.com/users/$.userId',
        retryPolicy: {
          maxAttempts: 3,
          continueOnFail: true
        }
      },
      inputs: ['fetch-user'],
      outputs: ['merge']
    },
    {
      id: 'enrich-preferences',
      type: 'http',
      config: {
        url: 'https://preferences-api.example.com/users/$.userId',
        retryPolicy: {
          maxAttempts: 3,
          continueOnFail: true
        }
      },
      inputs: ['fetch-user'],
      outputs: ['merge']
    },
    {
      id: 'merge',
      type: 'transform',
      config: {},
      inputs: ['fetch-user', 'enrich-profile', 'enrich-orders', 'enrich-preferences'],
      outputs: ['save']
    },
    {
      id: 'save',
      type: 'database',
      config: {
        query: 'UPDATE users SET enriched_data = $1 WHERE id = $2'
      },
      inputs: ['merge'],
      outputs: []
    }
  ]
};

Conditional Branching

Route workflow based on data:

const routingWorkflow: WorkflowDefinition = {
  id: 'conditional-routing',
  name: 'Conditional Routing',
  entryNodeId: 'evaluate',
  nodes: [
    {
      id: 'evaluate',
      type: 'router',
      config: {
        condition: 'orderType'  // Field to check
      },
      inputs: [],
      outputs: ['digital', 'physical', 'service']
    },
    {
      id: 'digital',
      type: 'transform',
      config: {
        action: 'send-digital-download'
      },
      inputs: ['evaluate'],
      outputs: ['complete']
    },
    {
      id: 'physical',
      type: 'transform',
      config: {
        action: 'create-shipping-label'
      },
      inputs: ['evaluate'],
      outputs: ['complete']
    },
    {
      id: 'service',
      type: 'transform',
      config: {
        action: 'schedule-appointment'
      },
      inputs: ['evaluate'],
      outputs: ['complete']
    },
    {
      id: 'complete',
      type: 'email',
      config: { to: '$.customerEmail' },
      inputs: ['digital', 'physical', 'service'],
      outputs: []
    }
  ]
};

// Router executor
class RouterExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { orderType } = context.inputData;
    const { cases = {} } = context.nodeConfig || {};

    const targetNode = cases[orderType] || cases.default;

    if (!targetNode) {
      return {
        success: false,
        error: `No route for orderType: ${orderType}`
      };
    }

    return {
      success: true,
      data: { routedTo: targetNode },
      nextNodes: [targetNode]
    };
  }
}

Retry with Backoff

Retry failed operations with exponential backoff:

const retryWorkflow: WorkflowDefinition = {
  id: 'retry-example',
  name: 'Retry with Backoff',
  entryNodeId: 'unstable-operation',
  nodes: [
    {
      id: 'unstable-operation',
      type: 'http',
      config: {
        url: 'https://unstable-api.example.com/endpoint',
        retryPolicy: {
          maxAttempts: 5,
          backoff: {
            type: 'exponential',
            delay: 1000
          }
        },
        circuitBreaker: {
          failureThreshold: 10,
          successThreshold: 2,
          timeout: 60000
        }
      },
      inputs: [],
      outputs: ['success-handler']
    },
    {
      id: 'success-handler',
      type: 'database',
      config: {
        query: 'UPDATE operations SET status = $1'
      },
      inputs: ['unstable-operation'],
      outputs: []
    }
  ]
};

Integration Examples

Stripe Payment Processing

Process payments with Stripe and handle webhooks:

const stripePaymentWorkflow: WorkflowDefinition = {
  id: 'stripe-payment',
  name: 'Stripe Payment',
  entryNodeId: 'create-charge',
  triggers: [
    {
      type: 'webhook',
      config: { path: 'payment', method: 'POST' }
    }
  ],
  nodes: [
    {
      id: 'create-charge',
      type: 'http',
      config: {
        url: 'https://api.stripe.com/v1/charges',
        method: 'POST',
        headers: {
          'Authorization': 'Bearer sk_test_xxx'
        },
        retryPolicy: {
          maxAttempts: 3,
          backoff: { type: 'exponential', delay: 1000 }
        }
      },
      inputs: [],
      outputs: ['update-order', 'send-receipt']
    },
    {
      id: 'update-order',
      type: 'database',
      config: {
        query: 'UPDATE orders SET payment_status = $1, stripe_id = $2 WHERE id = $3'
      },
      inputs: ['create-charge'],
      outputs: []
    },
    {
      id: 'send-receipt',
      type: 'email',
      config: {
        to: '$.customerEmail',
        template: 'payment-receipt'
      },
      inputs: ['create-charge'],
      outputs: []
    }
  ]
};

AWS S3 File Upload

Upload files to S3 with retry and circuit breaker:

const s3UploadWorkflow: WorkflowDefinition = {
  id: 's3-upload',
  name: 'AWS S3 File Upload',
  entryNodeId: 'prepare-upload',
  nodes: [
    {
      id: 'prepare-upload',
      type: 'transform',
      config: {},
      inputs: [],
      outputs: ['upload']
    },
    {
      id: 'upload',
      type: 'http',
      config: {
        url: 'https://s3.amazonaws.com/$.bucket/$.key',
        method: 'PUT',
        headers: {
          'Content-Type': '$.contentType',
          'x-amz-server-side-encryption': 'AES256'
        },
        retryPolicy: {
          maxAttempts: 5,
          backoff: { type: 'exponential', delay: 2000 }
        },
        circuitBreaker: {
          failureThreshold: 10,
          successThreshold: 2,
          timeout: 30000
        }
      },
      inputs: ['prepare-upload'],
      outputs: ['log-success']
    },
    {
      id: 'log-success',
      type: 'database',
      config: {
        query: 'INSERT INTO uploads (file_id, url, status) VALUES ($1, $2, $3)'
      },
      inputs: ['upload'],
      outputs: []
    }
  ]
};

Slack Notification

Send notifications to Slack:

class SlackExecutor implements INodeExecutor {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const { webhookUrl, message, channel } = context.nodeConfig || {};

    if (!webhookUrl) {
      return {
        success: false,
        error: 'Webhook URL is required'
      };
    }

    const response = await fetch(webhookUrl, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        channel,
        text: message,
        ...context.inputData
      })
    });

    if (!response.ok) {
      return {
        success: false,
        error: `Slack API error: ${response.status}`
      };
    }

    return { success: true, data: { sent: true } };
  }
}

const slackWorkflow: WorkflowDefinition = {
  id: 'slack-notification',
  name: 'Slack Notification',
  entryNodeId: 'notify',
  nodes: [
    {
      id: 'notify',
      type: 'slack',
      config: {
        webhookUrl: process.env.SLACK_WEBHOOK_URL,
        channel: '#alerts',
        message: 'Workflow completed successfully!'
      },
      inputs: [],
      outputs: []
    }
  ]
};

Scheduled Workflows

Daily Report

Generate and send daily reports:

const dailyReportWorkflow: WorkflowDefinition = {
  id: 'daily-report',
  name: 'Daily Report',
  entryNodeId: 'fetch-metrics',
  triggers: [
    {
      type: 'schedule',
      config: {
        cron: '0 9 * * *',  // Every day at 9 AM
        timezone: 'America/New_York'
      }
    }
  ],
  nodes: [
    {
      id: 'fetch-metrics',
      type: 'database',
      config: {
        query: `SELECT * FROM metrics WHERE date = CURRENT_DATE`
      },
      inputs: [],
      outputs: ['generate-report']
    },
    {
      id: 'generate-report',
      type: 'transform',
      config: {
        expression: '$$ ~> {"summary": $count(), "data": $}'
      },
      inputs: ['fetch-metrics'],
      outputs: ['send-email']
    },
    {
      id: 'send-email',
      type: 'email',
      config: {
        to: 'reports@example.com',
        subject: 'Daily Metrics Report'
      },
      inputs: ['generate-report'],
      outputs: []
    }
  ]
};

Hourly Health Check

Monitor system health every hour:

const healthCheckWorkflow: WorkflowDefinition = {
  id: 'health-check',
  name: 'Hourly Health Check',
  entryNodeId: 'check-database',
  triggers: [
    {
      type: 'schedule',
      config: {
        cron: '0 * * * *',  // Every hour
        timezone: 'UTC'
      }
    }
  ],
  nodes: [
    {
      id: 'check-database',
      type: 'database',
      config: {
        query: 'SELECT 1'
      },
      inputs: [],
      outputs: ['check-api', 'check-redis']
    },
    {
      id: 'check-api',
      type: 'http',
      config: {
        url: 'https://api.example.com/health',
        retryPolicy: {
          maxAttempts: 2,
          backoff: { type: 'fixed', delay: 5000 }
        }
      },
      inputs: [],
      outputs: ['aggregate']
    },
    {
      id: 'check-redis',
      type: 'transform',
      config: {
        action: 'ping-redis'
      },
      inputs: [],
      outputs: ['aggregate']
    },
    {
      id: 'aggregate',
      type: 'transform',
      config: {},
      inputs: ['check-database', 'check-api', 'check-redis'],
      outputs: ['alert-on-failure']
    },
    {
      id: 'alert-on-failure',
      type: 'slack',
      config: {
        webhookUrl: process.env.SLACK_WEBHOOK_URL,
        channel: '#alerts',
        message: 'Health check failed!'
      },
      inputs: ['aggregate'],
      outputs: []
    }
  ]
};

Data Processing Pipelines

ETL Pipeline

Extract, Transform, and Load data:

const etlWorkflow: WorkflowDefinition = {
  id: 'etl-pipeline',
  name: 'ETL Pipeline',
  entryNodeId: 'extract',
  nodes: [
    {
      id: 'extract',
      type: 'http',
      config: {
        url: 'https://source-api.example.com/export',
        retryPolicy: {
          maxAttempts: 5,
          backoff: { type: 'exponential', delay: 2000 }
        }
      },
      inputs: [],
      outputs: ['transform']
    },
    {
      id: 'transform',
      type: 'transform',
      config: {
        expression: '$ ~> | {"id": $id, "processed": $name ~> $uppercase()} |'
      },
      inputs: ['extract'],
      outputs: ['load']
    },
    {
      id: 'load',
      type: 'database',
      config: {
        query: 'INSERT INTO processed_data (id, name) VALUES ($1, $2)'
      },
      inputs: ['transform'],
      outputs: ['notify']
    },
    {
      id: 'notify',
      type: 'slack',
      config: {
        webhookUrl: process.env.SLACK_WEBHOOK_URL,
        message: 'ETL pipeline completed successfully'
      },
      inputs: ['load'],
      outputs: []
    }
  ]
};

Batch Processing

Process items in batches:

const batchProcessingWorkflow: WorkflowDefinition = {
  id: 'batch-processing',
  name: 'Batch Processing',
  entryNodeId: 'fetch-batch',
  nodes: [
    {
      id: 'fetch-batch',
      type: 'database',
      config: {
        query: `SELECT * FROM pending_items LIMIT 100`
      },
      inputs: [],
      outputs: ['process-items']
    },
    {
      id: 'process-items',
      type: 'transform',
      config: {
        expression: '$ ~> $ ~> {"id": $id, "status": "processed"} |'
      },
      inputs: ['fetch-batch'],
      outputs: ['save-results']
    },
    {
      id: 'save-results',
      type: 'database',
      config: {
        query: `UPDATE pending_items SET status = $1, processed_at = NOW() WHERE id = $2`
      },
      inputs: ['process-items'],
      outputs: ['schedule-next']
    },
    {
      id: 'schedule-next',
      type: 'delay',
      config: {
        duration: 60000  // Wait 1 minute before next batch
      },
      inputs: ['save-results'],
      outputs: []
    }
  ]
};

Best Practices

  1. Start Simple: Begin with basic workflows, add complexity gradually
  2. Use Idempotency: Make operations safe to retry multiple times
  3. Handle Failures: Always have fallback or error handling
  4. Monitor Everything: Use event streaming to track executions
  5. Test Thoroughly: Test workflows in staging before production
  6. Document Workflows: Keep workflow definitions well documented
  7. Use Circuit Breakers: Protect external dependencies
  8. Set Timeouts: Prevent indefinite hanging operations

Next Steps

On this page