GitHunt
SK

skorfmann/durable-workflow

DurableWorkflow

A durable workflow library for Rails built on top of Active Job Continuations. Provides Inngest/Temporal-like capabilities with a Rails-native feel.

Features

  • Memoized Steps (run) - Execute code once, replay from cache
  • Durable Sleep (sleep) - Pause workflows across process restarts
  • Event Waiting (wait_for_event) - Pause until external events arrive
  • Audit Trail - Full history of workflow execution
  • Pluggable Backends - Memory, Redis, or ActiveRecord storage

Installation

Add to your Gemfile:

gem 'durable_workflow', path: 'path/to/durable_workflow'

Configure in an initializer:

# config/initializers/durable_workflow.rb
DurableWorkflow.configure do |config|
  # Backend for step results (journal)
  config.journal_backend = :redis  # :memory, :redis, or :active_record

  # Backend for event subscriptions
  config.event_bus_backend = :redis

  # Redis connection (if using Redis backends)
  config.redis = Redis.new(url: ENV['REDIS_URL'])

  # Default timeout for wait_for_event
  config.default_event_timeout = 1.hour

  # Queue for wake-up jobs
  config.wake_up_queue = :default
  config.event_trigger_queue = :default
end

Quick Start

class OnboardingWorkflow < ApplicationJob
  include DurableWorkflow::Workflow

  def perform(user_id:)
    workflow do |ctx|
      # Memoized - only executes once, replays from cache
      user = ctx.run(:fetch_user) { User.find(user_id) }

      # Memoized side effect - email sent exactly once
      ctx.run(:send_welcome) { UserMailer.welcome(user).deliver_now }

      # Durable sleep - survives restarts
      ctx.sleep(:wait_24h, 24.hours)

      # Wait for external event (or timeout)
      begin
        event = ctx.wait_for_event(:confirmation, "user.confirmed",
          timeout: 7.days,
          filter: { user_id: user_id }
        )
        ctx.run(:activate) { user.activate! }
      rescue DurableWorkflow::EventTimeoutError
        ctx.run(:send_reminder) { UserMailer.reminder(user).deliver_now }
      end
    end
  end
end

Trigger the workflow:

OnboardingWorkflow.perform_later(user_id: 123)

Publish events to trigger waiting workflows:

DurableWorkflow.publish("user.confirmed", { user_id: 123 })

Core Concepts

Memoized Steps (run)

Steps executed with run are cached in the journal. On replay, the cached result is returned without re-executing the block.

# First execution: calls the block, stores result
user = ctx.run(:fetch_user) { User.find(user_id) }

# Replay: returns cached result immediately
user = ctx.run(:fetch_user) { User.find(user_id) }

Important: Ensure your step blocks are idempotent for external calls. Use idempotency keys:

ctx.run(:charge_payment) do
  PaymentService.charge(
    amount: order.total,
    idempotency_key: "order-#{order.id}-charge"
  )
end

Durable Sleep (sleep)

Sleep pauses the workflow for a duration, surviving process restarts:

ctx.sleep(:wait_before_reminder, 24.hours)
# Workflow pauses here, resumes after 24 hours
ctx.run(:send_reminder) { ... }

Under the hood:

  1. Workflow records sleep start time in journal
  2. Schedules WakeUpJob to run after duration
  3. Raises Interrupt to pause workflow
  4. WakeUpJob marks sleep complete and re-enqueues workflow
  5. Workflow resumes, skips completed sleep step

Event Waiting (wait_for_event)

Wait for external events with optional timeout:

event = ctx.wait_for_event(
  :payment_completed,      # Step name
  "payment.success",       # Event type to wait for
  timeout: 1.hour,         # Optional timeout
  filter: { order_id: 123 } # Optional filter
)

Publish events from anywhere:

DurableWorkflow.publish("payment.success", {
  order_id: 123,
  amount: 99.99,
  transaction_id: "txn_abc"
})

Handle timeouts:

begin
  event = ctx.wait_for_event(:confirmation, "user.confirmed", timeout: 7.days)
  # Event received
rescue DurableWorkflow::EventTimeoutError
  # Timeout occurred
end

Combining with AJ Continuations

For fine-grained cursor tracking within a step, combine with raw AJ Continuations:

class ImportWorkflow < ApplicationJob
  include DurableWorkflow::Workflow

  def perform(import_id:)
    workflow do |ctx|
      import = ctx.run(:fetch) { Import.find(import_id) }

      # Use AJ Continuations step for cursor-based iteration
      step :process_records do |step|
        import.records.find_each(start: step.cursor) do |record|
          record.process!
          step.advance!(from: record.id)
        end
      end

      ctx.run(:finalize) { import.complete! }
    end
  end
end

Backends

Memory (Testing/Development)

config.journal_backend = :memory
config.event_bus_backend = :memory

Data is lost on restart. Good for testing.

Redis (Production)

config.journal_backend = :redis
config.event_bus_backend = :redis
config.redis = Redis.new(url: ENV['REDIS_URL'])

ActiveRecord (Production)

config.journal_backend = :active_record
config.event_bus_backend = :active_record

Requires migrations:

# Journal entries
create_table :durable_workflow_journal_entries do |t|
  t.string :workflow_id, null: false
  t.string :step_name, null: false
  t.json :result
  t.json :metadata, default: {}
  t.datetime :executed_at, null: false
  t.timestamps

  t.index [:workflow_id, :step_name], unique: true
  t.index :workflow_id
end

# Event subscriptions
create_table :durable_workflow_subscriptions do |t|
  t.string :workflow_id, null: false
  t.string :step_name, null: false
  t.string :event_type, null: false
  t.json :filter, default: {}
  t.datetime :timeout_at, null: false
  t.string :workflow_class
  t.timestamps

  t.index [:workflow_id, :step_name], unique: true
  t.index :event_type
  t.index :timeout_at
end

# Pending events
create_table :durable_workflow_pending_events do |t|
  t.string :workflow_id, null: false
  t.string :step_name, null: false
  t.json :payload, null: false
  t.timestamps

  t.index [:workflow_id, :step_name], unique: true
end

Testing

Include the test helper:

class MyWorkflowTest < ActiveSupport::TestCase
  include DurableWorkflow::TestHelper

  setup do
    reset_durable_workflow!
  end

  test "workflow sends welcome email" do
    perform_workflow(OnboardingWorkflow, user_id: user.id)
    assert_step_executed(:send_welcome)
  end

  test "workflow waits for confirmation" do
    perform_workflow(OnboardingWorkflow, user_id: user.id)
    assert_waiting_for_event("user.confirmed")
  end

  test "workflow completes with event" do
    perform_workflow_to_completion(
      OnboardingWorkflow,
      user_id: user.id,
      events: { "user.confirmed" => { user_id: user.id } }
    )
    assert_step_executed(:activate)
  end

  test "workflow handles timeout" do
    perform_workflow_to_completion(
      OnboardingWorkflow,
      user_id: user.id,
      events: {}  # No confirmation event
    )
    assert_step_executed(:send_reminder)
  end
end

Audit Trail

Get the execution history:

job = OnboardingWorkflow.perform_now(user_id: 123)
job.audit_trail.each do |entry|
  puts "#{entry.step_name}: #{entry.result} at #{entry.executed_at}"
end

Cleanup

Clean up workflow data after completion:

job.cleanup_workflow!

Or clean up old workflows:

# In a scheduled job
DurableWorkflow.journal.entries_older_than(30.days).each do |entry|
  DurableWorkflow.journal.clear(entry.workflow_id)
end

Comparison to Inngest/Restate

Feature DurableWorkflow Inngest Restate
Step memoization
Durable sleep
Event triggers
Fan-out/parallel
Exactly-once ❌ (at-least-once)
Journal replay
Rails-native

Architecture

┌─────────────────────────────────────────────────────────────┐
│                     Your Workflow Job                        │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  workflow do |ctx|                                      ││
│  │    ctx.run(:step1) { ... }     # Memoized               ││
│  │    ctx.sleep(:wait, 1.hour)    # Durable                ││
│  │    ctx.wait_for_event(...)     # External trigger       ││
│  │  end                                                    ││
│  └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         │                 │                 │
         ▼                 ▼                 ▼
┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐
│     Journal     │ │  EventBus   │ │ AJ Continuations │
│  (step results) │ │ (pub/sub)   │ │   (interrupts)   │
└────────┬────────┘ └──────┬──────┘ └─────────────────┘
         │                 │
         ▼                 ▼
┌─────────────────────────────────────────┐
│   Storage Backend (Redis/ActiveRecord)   │
└─────────────────────────────────────────┘

License

MIT