ActiveSaga

A workflow engine with durable steps, automatic retries with backoff and jitter, compensations, async waits, signals, timeouts, idempotency, and pluggable persistence (ActiveRecord included).


Table of Contents


Why ActiveSaga?

Most apps need reliable business workflows (sagas) with retries, compensations and “wait for an external thing (webhook/job)”—without operating a heavyweight orchestration service. ActiveSaga gives you that on plain Rails + Active Job.


Features

  • Flexible step definitions: define workflow steps as methods, task classes, or inline blocks with identical semantics.
  • Async steps: start external work, persist correlation, wait for completion via API.
  • Signals: pause workflows until external events or triggers resume execution.
  • Retries: fixed/exponential backoff with jitter, per-step timeouts.
  • Compensations: reverse-order undo on cancel/failure.
  • Cancellation: explicit API to stop workflows, run compensations, and mark remaining work cancelled.
  • Idempotency: workflow-level keys + step dedupe + idempotent completions.
  • Pluggable stores: strategy interface; ships with ActiveRecord.
  • Observability: ActiveSupport::Notifications for steps, signals, retries, timeouts.

Installation

# Gemfile
gem "active_saga"

bundle install
rails g active_saga:install
rails db:migrate

The installer creates:

  • config/initializers/active_saga.rb
  • ActiveRecord migrations
  • Optional sample workflow

Quick Start

# config/initializers/active_saga.rb
ActiveSaga.configure do |c|
  c.store      = ActiveSaga::Stores::ActiveRecord.new
  c.logger     = Rails.logger
  c.serializer = :json
  c.clock      = -> { Time.now.utc }
end
# app/workflows/checkout_flow.rb
class CheckoutFlow < ActiveSaga::Workflow
  idempotency_key { "checkout:#{ctx[:order_id]}" }
  defaults retry: { max: 5, backoff: :exponential, jitter: true, first_delay: 1.second }
  timeout 30.minutes

  step :charge_card, compensate: :refund_payment, retry: { max: 6, first_delay: 2.seconds }
  task :reserve_stock, ReserveStockTask, dedupe: true
  task :send_receipt,  fire_and_forget: true do |ctx|
    Mailers::Receipt.deliver_later(ctx[:order_id])
  end

  def charge_card
    payment = PSP.charge!(order_id: ctx[:order_id], token: ctx[:payment_token])
    ctx[:payment_id] = payment.id
  end

  def refund_payment
    PSP.refund!(ctx[:payment_id]) if ctx[:payment_id]
  end
end
# Run it
flow   = CheckoutFlow.start(order_id: 42, payment_token: "tok_123")
result = flow.await # optional: block until terminal state

# cancel if user aborts checkout
flow.cancel!(reason: "user_aborted") if user_cancelled?

Defining Workflows (3 equivalent styles)

All step forms accept the same options:
retry:, compensate:, timeout:, async:, dedupe:, if:, unless:, args:, store_result_as:, fire_and_forget:.

A) Method step

class ExampleFlow < ActiveSaga::Workflow
  step :do_work, compensate: :undo_work

  def do_work
    res = Service.call!(ctx[:input])
    ctx[:output] = res
  end

  def undo_work
    Service.undo!(ctx[:output]) if ctx[:output]
  end
end

B) Task class

class ReserveStockTask < ActiveSaga::Task
  def call(ctx)
    res = Inventory.reserve!(ctx[:order_id])
    { reservation_id: res.id } # merged into ctx
  end

  def compensate(ctx, result:)
    Inventory.release!(result[:reservation_id]) if result&.dig(:reservation_id)
  end
end

class CheckoutFlow < ActiveSaga::Workflow
  task :reserve_stock, ReserveStockTask
end

C) Inline block

class NotifyFlow < ActiveSaga::Workflow
  task :send_email do |ctx|
    Mailers::Notice.deliver_later(ctx[:user_id])
    nil
  end
end

Async Steps (vs Sync) – Concept & Lifecycle

Sync step: runs to completion within one worker execution. Returns value → advance; raises → retry/fail.

Async step: initiates external work and returns immediately. The engine persists the step in waiting state (with correlation/timeout). Later, an external caller resumes the step via complete_step! (or fail_step!). No worker thread is held while waiting.

Declaring async steps

# Per-step option
task :arrange_fulfillment, FulfillmentTask, async: true, timeout: 15.minutes

# Class-level declaration
class FulfillmentTask < ActiveSaga::Task
  async! timeout: 15.minutes
  def call(ctx)
    job = FulfillmentAPI.create_job!(order_id: ctx[:order_id]) # initiate
    { fulfillment_job_id: job.id }                              # merge into ctx
  end
end

Engine behavior when async:

  • Runs call once to initiate work.
  • If call returns, persist step → waiting, set waiting_since, compute timeout_at.
  • If call raises, treat like sync failure (retry policy).
  • Does not advance the cursor until a completion arrives.

Completion/Failure/Timeout APIs

# Mark an async step as completed and advance the flow
ActiveSaga.complete_step!(execution_id, :arrange_fulfillment,
  payload: { tracking: "XYZ" }, idempotency_key: params[:event_id])

# Explicit failure path while waiting
ActiveSaga.fail_step!(execution_id, :arrange_fulfillment,
  error_class: "RemoteError", message: "Timeout from vendor", details: { ... },
  idempotency_key: params[:event_id])

# Optional: extend waiting timeout
ActiveSaga.extend_timeout!(execution_id, :arrange_fulfillment, by: 10.minutes)

# Optional: heartbeat for monitoring
ActiveSaga.heartbeat!(execution_id, :arrange_fulfillment)

Payload storage:

  • Initiation return (from call) is merged into ctx immediately.
  • Completion payload is stored under ctx[step_name] (or store_result_as: key).

Timeouts, retries, and re-initiation

  • timeout: caps waiting time. On timeout:
    • If retry configured → re-initiate the step (re-run call) with backoff+jitter.
    • Else mark step failed; apply flow failure/compensation policy.
  • Re-initiation must be idempotent—use business idempotency keys (e.g., execution_id:step_name) for external calls.

Security, idempotency & race-safety

  • complete_step! / fail_step! are idempotent (accept idempotency_key).
  • Optionally verify a correlation_id created during initiation before accepting completion.
  • Use DB transactional updates and row locks so only one runner transitions a step.
  • If a step isn’t waiting, completion/failure is a safe no-op.

Sync vs Async comparison

Aspect Sync step Async step
Worker time Runs to completion in one job Initiates work, returns immediately; later resumed via API
State progression pending → running → completed/failed pending → running → waiting → (completed/failed/timed_out)
Cursor advancement Immediately after success After complete_step!
Failure while waiting N/A fail_step! or timeout triggers retry/fail
Timeouts Caps execution time Caps waiting time; can extend_timeout! or heartbeat!
Idempotency Step-level dedupe Idempotent completions with idempotency_key; re-init safe with backoff

Signals & Waits

Pause the workflow until a signal arrives.

class ApprovalFlow < ActiveSaga::Workflow
  task :prepare, PrepareTask
  wait_for_signal :approval, as: :approval
  task :finalize, FinalizeTask, args: ->(ctx) { [ctx[:approval]] }
end

# Controller/webhook sending a signal:
ActiveSaga.signal!(params[:execution_id], :approval,
  payload: { approved_by: current_user.id, decision: "approved" })

Signals are persisted events, consumed once, and idempotent.


Cancellation

Stop a running workflow (for example, when the user backs out).

# Cancel via execution id (runs compensations, cancels remaining steps)
ActiveSaga.cancel!(execution.id, reason: "user_request")

# Or on the execution instance
execution.cancel!(reason: "user_request")

Cancellation:

  • Runs compensations for completed steps in reverse order.
  • Marks pending/running/waiting steps as cancelled and clears scheduled timeouts.
  • Transitions the execution to the terminal cancelled state (cancelled_at timestamp set).
  • Emits active_saga.execution.cancelled for observability.

Repeated calls are safe (idempotent).


Retries, Backoff & Jitter

Specify per step or via defaults:

defaults retry: { max: 5, backoff: :exponential, first_delay: 1.second, jitter: true }
step :slow_call, retry: { max: 10, backoff: :fixed, delay: 5.seconds }
  • backoff: :fixed or :exponential
  • first_delay: initial wait before first retry
  • delay: (for fixed)
  • jitter: true to randomize and reduce thundering herd

Idempotency & Deduplication

  • Workflow level: idempotency_key { "checkout:#{ctx[:order_id]}" } ensures only one logical execution per business key.
  • Step level: dedupe: true skips re-executing a step that already completed successfully.
  • Async completion: complete_step!(..., idempotency_key:) prevents duplicate completions.

Configuration

ActiveSaga.configure do |c|
  c.store      = ActiveSaga::Stores::ActiveRecord.new
  c.logger     = Rails.logger
  c.serializer = :json            # or a custom object responding to dump/load
  c.clock      = -> { Time.now.utc }
end

Persistence (Adapters)

ActiveSaga uses a Store strategy.

ActiveRecord store (ships with gem):

  • Tables (prefix as_):
    • as_executions: workflow metadata (workflow_class, state, ctx jsonb, cursor_step, idempotency_key, cancelled_at, timestamps)
    • as_steps: per-step status (pending|running|waiting|completed|failed|timed_out|compensating|compensated), attempts, backoff data, init_result, completion_payload, timeout_at, waiting_since, heartbeat_at, correlation_id, completion_idempotency_key
    • as_events: signals etc. (name, payload, consumed_at)
    • (Optional) as_timers: scheduled timeouts/backoffs (or embed in steps)
  • Indexing: unique on idempotency_key; partial unique on (execution_id, step_name, completion_idempotency_key) WHERE completion_idempotency_key IS NOT NULL; JSONB GIN on payloads if needed.
  • Concurrency: transactional updates + row locks / SKIP LOCKED.

You can implement other stores (e.g., Redis) by following the ActiveSaga::Store interface.


Active Job Integration

  • A single internal job (e.g., ActiveSaga::Jobs::RunnerJob) receives execution/step IDs and performs the next transition safely.
  • No dependency on specific backends (Sidekiq, Solid Queue, etc.)—any Active Job adapter works.
  • Timers/backoffs are scheduled via set(wait_until: ...).

Observability (Events & Logging)

Subscribe with ActiveSupport::Notifications:

Events include:

  • active_saga.step.started
  • active_saga.step.completed
  • active_saga.step.failed
  • active_saga.step.waiting (async initiation successful)
  • active_saga.step.completed_async (after complete_step!)
  • active_saga.step.failed_async (after fail_step!)
  • active_saga.step.timeout
  • active_saga.retry.scheduled
  • active_saga.signal.received
  • active_saga.execution.cancelled

Each event carries identifiers (execution_id, workflow, step), timings, attempts, and error details (when applicable).


Generators

rails g active_saga:install     # initializer + migrations + sample
rails g active_saga:workflow CheckoutFlow

The workflow generator scaffolds a class under app/workflows/.


Testing

  • RSpec helpers (recommended):
    • Start a flow and assert final state: ruby flow = MyFlow.start(foo: 1) expect(flow.await(timeout: 10.seconds).state).to eq("completed")
    • Simulate async completion: ruby ActiveSaga.complete_step!(flow.id, :export, payload: { url: "..." }, idempotency_key: "evt-1")
  • Cover:
    • retries/backoff behavior,
    • dedupe (no double execution),
    • signals,
    • timeouts (including re-initiation),
    • compensation reverse order,
    • idempotent completions,
    • crash-safety (re-enqueue after failure mid-step).

Examples

Signal workflow

class SignalWorkflow < ActiveSaga::Workflow
  task :prepare, PrepareTask
  wait_for_signal :approval, as: :approval
  task :finalize, FinalizeTask, args: ->(ctx) { [ctx[:approval]] }
end

Async export + notify

class ExportFlow < ActiveSaga::Workflow
  task :request_export, ExporterTask, async: true, timeout: 30.minutes
  task :notify_user do |ctx|
    Mailers::ExportReady.deliver_later(ctx[:user_id], ctx[:request_export][:download_url])
  end
end

class ExporterTask < ActiveSaga::Task
  async! timeout: 30.minutes
  def call(ctx)
    req = ExportAPI.create!(user_id: ctx[:user_id], request_id: "#{ctx[:execution_id]}:request_export")
    { export_job_id: req.id, correlation: req.token }
  end
end

Webhook:

def export_ready
  ActiveSaga.complete_step!(params[:exec_id], :request_export,
    payload: { download_url: params[:url] }, idempotency_key: params[:event_id])
  head :ok
end

Design Notes

  • Prefer Task classes for reusable/complex steps and DI; use method or block steps for quick wiring.
  • Use business idempotency keys in external systems (execution_id:step_name) for safe re-init.
  • Keep ctx small (IDs and small payloads). Store large blobs elsewhere and pass references.

License

MIT © You and contributors.