FlowOrganizer

Run a list of callables in sequence, threading a context hash through each step. A small, allocation-conscious "functional interactor" / Railway-style pipeline.

FlowOrganizer.call(
  list: [
    ->(email:)            { [:ok, normalized_email: email.downcase.strip] },
    ->(normalized_email:) { [:ok, user: User.find_by(email: normalized_email)] },
    ->(user:)             { user ? [:ok] : [:error, code: 'user.not_found'] },
    ->(user:)             { [:ok, token: Auth.issue_token(user)] },
  ],
  ctx: { email: ' Alice@Example.com ' },
)
# => [:ok, { email: ..., normalized_email: ..., user: ..., token: ... }]

Why

The pattern is known under many names: Pipes and Filters, Pipeline, Railway-Oriented Programming, functional interactor.

The core idea: a list of small operations passes a shared context forward, any one of them can stop the chain by returning an error, and the caller gets back a single [status, ctx] tuple.

Comparable Ruby gems include interactor, light-service, and dry-transaction.

FlowOrganizer trades their OOP approach & custom DSL for a functional approach: any callable can be used (lambda, method, or #call-responding objects). No need to have every step be a new Class.

Installation

Requires Ruby 3.2+.

# Gemfile
gem 'flow_organizer'
bundle install
# or
gem install flow_organizer

The callable contract

Every callable must return one of:

Return Meaning
[:ok] Success, no context update.
[:ok, { key: value }] Success, merge the hash into the context.
[:halt] Stop the chain. The caller still sees a final :ok.
[:halt, { key: value }] Same as above, with a final context update.
[:error] Stop the chain. Caller sees :error.
[:error, errors_payload] As above, with a normalized errors: array in ctx.

For :error, the payload can be a string, a hash with :title/:detail/:code, an array of either, or already a { errors: [...] } hash. FlowOrganizer normalizes them all to the canonical { errors: [{ title: ... }, ...] } shape.

Callables only receive the context keys their kwargs declare:

->(value:)            { ... }     # gets { value: ... }
->(value:, other:)    { ... }     # gets { value: ..., other: ... }
->(**)                { ... }     # gets the full ctx
->()                  { ... }     # gets nothing

Positional arguments are not supported: this is what lets it cheaply slice the context per step.

Dual-track execution

Pass list_error: to run a compensation track if the success track fails:

FlowOrganizer.call(
  list:       [book_flight, charge_card, send_confirmation],
  list_error: [refund_card, send_apology],
  ctx:        { user: user, flight: flight },
)

If charge_card returns [:error], execution jumps to refund_card with the accumulated context.

Resolvers

Anything responding to #call works directly. For more structured lists, two extras ship in the box.

Exception handling

A callable that raises is caught and turned into [:error, { error: <exception> }]. Two knobs change the default:

# Re-raise instead of catching.
FlowOrganizer.call(list: [...], ctx: { ... }, raise_exception: true)

# Forward caught exceptions to your error reporter.
FlowOrganizer.exception_reporter = ->(exception:) { Sentry.capture_exception(exception) }

Performance

The included benchmark compares plain chained lambda calls against FlowOrganizer.call with a 5-step pipeline, 20 000 iterations:

                               total        per-call ratio     vs baseline
  plain chained calls           16.05 ms     0.80 µs   1.00x   (baseline)
  flow_organizer (cached)       78.42 ms     3.92 µs   4.87x   +388%
  flow_organizer (uncached)    158.92 ms     7.95 µs   9.90x   +890%

Run it locally:

ruby benchmark/flow_organizer_overhead.rb

The parameter-introspection cache is on by default and warms automatically. Call FlowOrganizer::Context.clear_parameter_cache if you reload code in development and want to drop stale Method objects.

License

MIT. See LICENSE.