skorfmann/durable-workflow
DurableWorkflow
A durable workflow library for Rails built on top of Active Job Continuations. Provides Inngest/Temporal-like capabilities with a Rails-native feel.
Features
- Memoized Steps (
run) - Execute code once, replay from cache - Durable Sleep (
sleep) - Pause workflows across process restarts - Event Waiting (
wait_for_event) - Pause until external events arrive - Audit Trail - Full history of workflow execution
- Pluggable Backends - Memory, Redis, or ActiveRecord storage
Installation
Add to your Gemfile:
gem 'durable_workflow', path: 'path/to/durable_workflow'Configure in an initializer:
# config/initializers/durable_workflow.rb
DurableWorkflow.configure do |config|
# Backend for step results (journal)
config.journal_backend = :redis # :memory, :redis, or :active_record
# Backend for event subscriptions
config.event_bus_backend = :redis
# Redis connection (if using Redis backends)
config.redis = Redis.new(url: ENV['REDIS_URL'])
# Default timeout for wait_for_event
config.default_event_timeout = 1.hour
# Queue for wake-up jobs
config.wake_up_queue = :default
config.event_trigger_queue = :default
endQuick Start
class OnboardingWorkflow < ApplicationJob
include DurableWorkflow::Workflow
def perform(user_id:)
workflow do |ctx|
# Memoized - only executes once, replays from cache
user = ctx.run(:fetch_user) { User.find(user_id) }
# Memoized side effect - email sent exactly once
ctx.run(:send_welcome) { UserMailer.welcome(user).deliver_now }
# Durable sleep - survives restarts
ctx.sleep(:wait_24h, 24.hours)
# Wait for external event (or timeout)
begin
event = ctx.wait_for_event(:confirmation, "user.confirmed",
timeout: 7.days,
filter: { user_id: user_id }
)
ctx.run(:activate) { user.activate! }
rescue DurableWorkflow::EventTimeoutError
ctx.run(:send_reminder) { UserMailer.reminder(user).deliver_now }
end
end
end
endTrigger the workflow:
OnboardingWorkflow.perform_later(user_id: 123)Publish events to trigger waiting workflows:
DurableWorkflow.publish("user.confirmed", { user_id: 123 })Core Concepts
Memoized Steps (run)
Steps executed with run are cached in the journal. On replay, the cached result is returned without re-executing the block.
# First execution: calls the block, stores result
user = ctx.run(:fetch_user) { User.find(user_id) }
# Replay: returns cached result immediately
user = ctx.run(:fetch_user) { User.find(user_id) }Important: Ensure your step blocks are idempotent for external calls. Use idempotency keys:
ctx.run(:charge_payment) do
PaymentService.charge(
amount: order.total,
idempotency_key: "order-#{order.id}-charge"
)
endDurable Sleep (sleep)
Sleep pauses the workflow for a duration, surviving process restarts:
ctx.sleep(:wait_before_reminder, 24.hours)
# Workflow pauses here, resumes after 24 hours
ctx.run(:send_reminder) { ... }Under the hood:
- Workflow records sleep start time in journal
- Schedules
WakeUpJobto run after duration - Raises
Interruptto pause workflow WakeUpJobmarks sleep complete and re-enqueues workflow- Workflow resumes, skips completed sleep step
Event Waiting (wait_for_event)
Wait for external events with optional timeout:
event = ctx.wait_for_event(
:payment_completed, # Step name
"payment.success", # Event type to wait for
timeout: 1.hour, # Optional timeout
filter: { order_id: 123 } # Optional filter
)Publish events from anywhere:
DurableWorkflow.publish("payment.success", {
order_id: 123,
amount: 99.99,
transaction_id: "txn_abc"
})Handle timeouts:
begin
event = ctx.wait_for_event(:confirmation, "user.confirmed", timeout: 7.days)
# Event received
rescue DurableWorkflow::EventTimeoutError
# Timeout occurred
endCombining with AJ Continuations
For fine-grained cursor tracking within a step, combine with raw AJ Continuations:
class ImportWorkflow < ApplicationJob
include DurableWorkflow::Workflow
def perform(import_id:)
workflow do |ctx|
import = ctx.run(:fetch) { Import.find(import_id) }
# Use AJ Continuations step for cursor-based iteration
step :process_records do |step|
import.records.find_each(start: step.cursor) do |record|
record.process!
step.advance!(from: record.id)
end
end
ctx.run(:finalize) { import.complete! }
end
end
endBackends
Memory (Testing/Development)
config.journal_backend = :memory
config.event_bus_backend = :memoryData is lost on restart. Good for testing.
Redis (Production)
config.journal_backend = :redis
config.event_bus_backend = :redis
config.redis = Redis.new(url: ENV['REDIS_URL'])ActiveRecord (Production)
config.journal_backend = :active_record
config.event_bus_backend = :active_recordRequires migrations:
# Journal entries
create_table :durable_workflow_journal_entries do |t|
t.string :workflow_id, null: false
t.string :step_name, null: false
t.json :result
t.json :metadata, default: {}
t.datetime :executed_at, null: false
t.timestamps
t.index [:workflow_id, :step_name], unique: true
t.index :workflow_id
end
# Event subscriptions
create_table :durable_workflow_subscriptions do |t|
t.string :workflow_id, null: false
t.string :step_name, null: false
t.string :event_type, null: false
t.json :filter, default: {}
t.datetime :timeout_at, null: false
t.string :workflow_class
t.timestamps
t.index [:workflow_id, :step_name], unique: true
t.index :event_type
t.index :timeout_at
end
# Pending events
create_table :durable_workflow_pending_events do |t|
t.string :workflow_id, null: false
t.string :step_name, null: false
t.json :payload, null: false
t.timestamps
t.index [:workflow_id, :step_name], unique: true
endTesting
Include the test helper:
class MyWorkflowTest < ActiveSupport::TestCase
include DurableWorkflow::TestHelper
setup do
reset_durable_workflow!
end
test "workflow sends welcome email" do
perform_workflow(OnboardingWorkflow, user_id: user.id)
assert_step_executed(:send_welcome)
end
test "workflow waits for confirmation" do
perform_workflow(OnboardingWorkflow, user_id: user.id)
assert_waiting_for_event("user.confirmed")
end
test "workflow completes with event" do
perform_workflow_to_completion(
OnboardingWorkflow,
user_id: user.id,
events: { "user.confirmed" => { user_id: user.id } }
)
assert_step_executed(:activate)
end
test "workflow handles timeout" do
perform_workflow_to_completion(
OnboardingWorkflow,
user_id: user.id,
events: {} # No confirmation event
)
assert_step_executed(:send_reminder)
end
endAudit Trail
Get the execution history:
job = OnboardingWorkflow.perform_now(user_id: 123)
job.audit_trail.each do |entry|
puts "#{entry.step_name}: #{entry.result} at #{entry.executed_at}"
endCleanup
Clean up workflow data after completion:
job.cleanup_workflow!Or clean up old workflows:
# In a scheduled job
DurableWorkflow.journal.entries_older_than(30.days).each do |entry|
DurableWorkflow.journal.clear(entry.workflow_id)
endComparison to Inngest/Restate
| Feature | DurableWorkflow | Inngest | Restate |
|---|---|---|---|
| Step memoization | ✅ | ✅ | ✅ |
| Durable sleep | ✅ | ✅ | ✅ |
| Event triggers | ✅ | ✅ | ✅ |
| Fan-out/parallel | ❌ | ✅ | ✅ |
| Exactly-once | ❌ (at-least-once) | ✅ | ✅ |
| Journal replay | ❌ | ✅ | ✅ |
| Rails-native | ✅ | ❌ | ❌ |
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Your Workflow Job │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ workflow do |ctx| ││
│ │ ctx.run(:step1) { ... } # Memoized ││
│ │ ctx.sleep(:wait, 1.hour) # Durable ││
│ │ ctx.wait_for_event(...) # External trigger ││
│ │ end ││
│ └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐
│ Journal │ │ EventBus │ │ AJ Continuations │
│ (step results) │ │ (pub/sub) │ │ (interrupts) │
└────────┬────────┘ └──────┬──────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────┐
│ Storage Backend (Redis/ActiveRecord) │
└─────────────────────────────────────────┘
License
MIT