ActiveSaga
A workflow engine with durable steps, automatic retries with backoff and jitter, compensations, async waits, signals, timeouts, idempotency, and pluggable persistence (ActiveRecord included).
Table of Contents
- Why ActiveSaga?
- Features
- Installation
- Quick Start
- Defining Workflows (3 equivalent styles)
- Async Steps (vs Sync) – Concept & Lifecycle
- Signals & Waits
- Cancellation
- Retries, Backoff & Jitter
- Idempotency & Deduplication
- Configuration
- Persistence (Adapters)
- Active Job Integration
- Observability (Events & Logging)
- Generators
- Testing
- Examples
- Design Notes
- License
Why ActiveSaga?
Most apps need reliable business workflows (sagas) with retries, compensations and “wait for an external thing (webhook/job)”—without operating a heavyweight orchestration service. ActiveSaga gives you that on plain Rails + Active Job.
Features
- Flexible step definitions: define workflow steps as methods, task classes, or inline blocks with identical semantics.
- Async steps: start external work, persist correlation, wait for completion via API.
- Signals: pause workflows until external events or triggers resume execution.
- Retries: fixed/exponential backoff with jitter, per-step timeouts.
- Compensations: reverse-order undo on cancel/failure.
- Cancellation: explicit API to stop workflows, run compensations, and mark remaining work cancelled.
- Idempotency: workflow-level keys + step dedupe + idempotent completions.
- Pluggable stores: strategy interface; ships with ActiveRecord.
- Observability:
ActiveSupport::Notificationsfor steps, signals, retries, timeouts.
Installation
# Gemfile
gem "active_saga"
bundle install
rails g active_saga:install
rails db:migrate
The installer creates:
config/initializers/active_saga.rb- ActiveRecord migrations
- Optional sample workflow
Quick Start
# config/initializers/active_saga.rb
ActiveSaga.configure do |c|
c.store = ActiveSaga::Stores::ActiveRecord.new
c.logger = Rails.logger
c.serializer = :json
c.clock = -> { Time.now.utc }
end
# app/workflows/checkout_flow.rb
class CheckoutFlow < ActiveSaga::Workflow
idempotency_key { "checkout:#{ctx[:order_id]}" }
defaults retry: { max: 5, backoff: :exponential, jitter: true, first_delay: 1.second }
timeout 30.minutes
step :charge_card, compensate: :refund_payment, retry: { max: 6, first_delay: 2.seconds }
task :reserve_stock, ReserveStockTask, dedupe: true
task :send_receipt, fire_and_forget: true do |ctx|
Mailers::Receipt.deliver_later(ctx[:order_id])
end
def charge_card
payment = PSP.charge!(order_id: ctx[:order_id], token: ctx[:payment_token])
ctx[:payment_id] = payment.id
end
def refund_payment
PSP.refund!(ctx[:payment_id]) if ctx[:payment_id]
end
end
# Run it
flow = CheckoutFlow.start(order_id: 42, payment_token: "tok_123")
result = flow.await # optional: block until terminal state
# cancel if user aborts checkout
flow.cancel!(reason: "user_aborted") if user_cancelled?
Defining Workflows (3 equivalent styles)
All step forms accept the same options:
retry:, compensate:, timeout:, async:, dedupe:, if:, unless:, args:, store_result_as:, fire_and_forget:.
A) Method step
class ExampleFlow < ActiveSaga::Workflow
step :do_work, compensate: :undo_work
def do_work
res = Service.call!(ctx[:input])
ctx[:output] = res
end
def undo_work
Service.undo!(ctx[:output]) if ctx[:output]
end
end
B) Task class
class ReserveStockTask < ActiveSaga::Task
def call(ctx)
res = Inventory.reserve!(ctx[:order_id])
{ reservation_id: res.id } # merged into ctx
end
def compensate(ctx, result:)
Inventory.release!(result[:reservation_id]) if result&.dig(:reservation_id)
end
end
class CheckoutFlow < ActiveSaga::Workflow
task :reserve_stock, ReserveStockTask
end
C) Inline block
class NotifyFlow < ActiveSaga::Workflow
task :send_email do |ctx|
Mailers::Notice.deliver_later(ctx[:user_id])
nil
end
end
Async Steps (vs Sync) – Concept & Lifecycle
Sync step: runs to completion within one worker execution. Returns value → advance; raises → retry/fail.
Async step: initiates external work and returns immediately. The engine persists the step in waiting state (with correlation/timeout). Later, an external caller resumes the step via complete_step! (or fail_step!). No worker thread is held while waiting.
Declaring async steps
# Per-step option
task :arrange_fulfillment, FulfillmentTask, async: true, timeout: 15.minutes
# Class-level declaration
class FulfillmentTask < ActiveSaga::Task
async! timeout: 15.minutes
def call(ctx)
job = FulfillmentAPI.create_job!(order_id: ctx[:order_id]) # initiate
{ fulfillment_job_id: job.id } # merge into ctx
end
end
Engine behavior when async:
- Runs
callonce to initiate work. - If
callreturns, persist step →waiting, setwaiting_since, computetimeout_at. - If
callraises, treat like sync failure (retry policy). - Does not advance the cursor until a completion arrives.
Completion/Failure/Timeout APIs
# Mark an async step as completed and advance the flow
ActiveSaga.complete_step!(execution_id, :arrange_fulfillment,
payload: { tracking: "XYZ" }, idempotency_key: params[:event_id])
# Explicit failure path while waiting
ActiveSaga.fail_step!(execution_id, :arrange_fulfillment,
error_class: "RemoteError", message: "Timeout from vendor", details: { ... },
idempotency_key: params[:event_id])
# Optional: extend waiting timeout
ActiveSaga.extend_timeout!(execution_id, :arrange_fulfillment, by: 10.minutes)
# Optional: heartbeat for monitoring
ActiveSaga.heartbeat!(execution_id, :arrange_fulfillment)
Payload storage:
- Initiation return (from
call) is merged intoctximmediately. - Completion payload is stored under
ctx[step_name](orstore_result_as:key).
Timeouts, retries, and re-initiation
timeout:caps waiting time. On timeout:- If
retryconfigured → re-initiate the step (re-runcall) with backoff+jitter. - Else mark step
failed; apply flow failure/compensation policy.
- If
- Re-initiation must be idempotent—use business idempotency keys (e.g.,
execution_id:step_name) for external calls.
Security, idempotency & race-safety
complete_step!/fail_step!are idempotent (acceptidempotency_key).- Optionally verify a correlation_id created during initiation before accepting completion.
- Use DB transactional updates and row locks so only one runner transitions a step.
- If a step isn’t
waiting, completion/failure is a safe no-op.
Sync vs Async comparison
| Aspect | Sync step | Async step |
|---|---|---|
| Worker time | Runs to completion in one job | Initiates work, returns immediately; later resumed via API |
| State progression | pending → running → completed/failed |
pending → running → waiting → (completed/failed/timed_out) |
| Cursor advancement | Immediately after success | After complete_step! |
| Failure while waiting | N/A | fail_step! or timeout triggers retry/fail |
| Timeouts | Caps execution time | Caps waiting time; can extend_timeout! or heartbeat! |
| Idempotency | Step-level dedupe |
Idempotent completions with idempotency_key; re-init safe with backoff |
Signals & Waits
Pause the workflow until a signal arrives.
class ApprovalFlow < ActiveSaga::Workflow
task :prepare, PrepareTask
wait_for_signal :approval, as: :approval
task :finalize, FinalizeTask, args: ->(ctx) { [ctx[:approval]] }
end
# Controller/webhook sending a signal:
ActiveSaga.signal!(params[:execution_id], :approval,
payload: { approved_by: current_user.id, decision: "approved" })
Signals are persisted events, consumed once, and idempotent.
Cancellation
Stop a running workflow (for example, when the user backs out).
# Cancel via execution id (runs compensations, cancels remaining steps)
ActiveSaga.cancel!(execution.id, reason: "user_request")
# Or on the execution instance
execution.cancel!(reason: "user_request")
Cancellation:
- Runs compensations for completed steps in reverse order.
- Marks pending/running/waiting steps as
cancelledand clears scheduled timeouts. - Transitions the execution to the terminal
cancelledstate (cancelled_attimestamp set). - Emits
active_saga.execution.cancelledfor observability.
Repeated calls are safe (idempotent).
Retries, Backoff & Jitter
Specify per step or via defaults:
defaults retry: { max: 5, backoff: :exponential, first_delay: 1.second, jitter: true }
step :slow_call, retry: { max: 10, backoff: :fixed, delay: 5.seconds }
backoff::fixedor:exponentialfirst_delay:initial wait before first retrydelay:(for fixed)jitter: trueto randomize and reduce thundering herd
Idempotency & Deduplication
- Workflow level:
idempotency_key { "checkout:#{ctx[:order_id]}" }ensures only one logical execution per business key. - Step level:
dedupe: trueskips re-executing a step that already completed successfully. - Async completion:
complete_step!(..., idempotency_key:)prevents duplicate completions.
Configuration
ActiveSaga.configure do |c|
c.store = ActiveSaga::Stores::ActiveRecord.new
c.logger = Rails.logger
c.serializer = :json # or a custom object responding to dump/load
c.clock = -> { Time.now.utc }
end
Persistence (Adapters)
ActiveSaga uses a Store strategy.
ActiveRecord store (ships with gem):
- Tables (prefix
as_):as_executions: workflow metadata (workflow_class,state,ctxjsonb,cursor_step,idempotency_key,cancelled_at, timestamps)as_steps: per-step status (pending|running|waiting|completed|failed|timed_out|compensating|compensated), attempts, backoff data,init_result,completion_payload,timeout_at,waiting_since,heartbeat_at,correlation_id,completion_idempotency_keyas_events: signals etc. (name,payload,consumed_at)- (Optional)
as_timers: scheduled timeouts/backoffs (or embed in steps)
- Indexing: unique on
idempotency_key; partial unique on(execution_id, step_name, completion_idempotency_key) WHERE completion_idempotency_key IS NOT NULL; JSONB GIN on payloads if needed. - Concurrency: transactional updates + row locks / SKIP LOCKED.
You can implement other stores (e.g., Redis) by following the ActiveSaga::Store interface.
Active Job Integration
- A single internal job (e.g.,
ActiveSaga::Jobs::RunnerJob) receives execution/step IDs and performs the next transition safely. - No dependency on specific backends (Sidekiq, Solid Queue, etc.)—any Active Job adapter works.
- Timers/backoffs are scheduled via
set(wait_until: ...).
Observability (Events & Logging)
Subscribe with ActiveSupport::Notifications:
Events include:
active_saga.step.startedactive_saga.step.completedactive_saga.step.failedactive_saga.step.waiting(async initiation successful)active_saga.step.completed_async(aftercomplete_step!)active_saga.step.failed_async(afterfail_step!)active_saga.step.timeoutactive_saga.retry.scheduledactive_saga.signal.receivedactive_saga.execution.cancelled
Each event carries identifiers (execution_id, workflow, step), timings, attempts, and error details (when applicable).
Generators
rails g active_saga:install # initializer + migrations + sample
rails g active_saga:workflow CheckoutFlow
The workflow generator scaffolds a class under app/workflows/.
Testing
- RSpec helpers (recommended):
- Start a flow and assert final state:
ruby flow = MyFlow.start(foo: 1) expect(flow.await(timeout: 10.seconds).state).to eq("completed") - Simulate async completion:
ruby ActiveSaga.complete_step!(flow.id, :export, payload: { url: "..." }, idempotency_key: "evt-1")
- Start a flow and assert final state:
- Cover:
- retries/backoff behavior,
- dedupe (no double execution),
- signals,
- timeouts (including re-initiation),
- compensation reverse order,
- idempotent completions,
- crash-safety (re-enqueue after failure mid-step).
Examples
Signal workflow
class SignalWorkflow < ActiveSaga::Workflow
task :prepare, PrepareTask
wait_for_signal :approval, as: :approval
task :finalize, FinalizeTask, args: ->(ctx) { [ctx[:approval]] }
end
Async export + notify
class ExportFlow < ActiveSaga::Workflow
task :request_export, ExporterTask, async: true, timeout: 30.minutes
task :notify_user do |ctx|
Mailers::ExportReady.deliver_later(ctx[:user_id], ctx[:request_export][:download_url])
end
end
class ExporterTask < ActiveSaga::Task
async! timeout: 30.minutes
def call(ctx)
req = ExportAPI.create!(user_id: ctx[:user_id], request_id: "#{ctx[:execution_id]}:request_export")
{ export_job_id: req.id, correlation: req.token }
end
end
Webhook:
def export_ready
ActiveSaga.complete_step!(params[:exec_id], :request_export,
payload: { download_url: params[:url] }, idempotency_key: params[:event_id])
head :ok
end
Design Notes
- Prefer Task classes for reusable/complex steps and DI; use method or block steps for quick wiring.
- Use business idempotency keys in external systems (
execution_id:step_name) for safe re-init. - Keep ctx small (IDs and small payloads). Store large blobs elsewhere and pass references.
License
MIT © You and contributors.