FlowOrganizer
Run a list of callables in sequence, threading a context hash through each step. A small, allocation-conscious "functional interactor" / Railway-style pipeline.
FlowOrganizer.call(
list: [
->(email:) { [:ok, normalized_email: email.downcase.strip] },
->(normalized_email:) { [:ok, user: User.find_by(email: normalized_email)] },
->(user:) { user ? [:ok] : [:error, code: 'user.not_found'] },
->(user:) { [:ok, token: Auth.issue_token(user)] },
],
ctx: { email: ' Alice@Example.com ' },
)
# => [:ok, { email: ..., normalized_email: ..., user: ..., token: ... }]
Why
The pattern is known under many names: Pipes and Filters, Pipeline, Railway-Oriented Programming, functional interactor.
The core idea: a list of small operations passes a shared context forward, any one of them can stop the chain by returning an error, and the caller gets back a single [status, ctx] tuple.
Comparable Ruby gems include
interactor,
light-service, and
dry-transaction.
FlowOrganizer trades their OOP approach & custom DSL for a functional approach: any callable can be used (lambda, method, or #call-responding objects). No need to have every step be a new Class.
Installation
Requires Ruby 3.2+.
# Gemfile
gem 'flow_organizer'
bundle install
# or
gem install flow_organizer
The callable contract
Every callable must return one of:
| Return | Meaning |
|---|---|
[:ok] |
Success, no context update. |
[:ok, { key: value }] |
Success, merge the hash into the context. |
[:halt] |
Stop the chain. The caller still sees a final :ok. |
[:halt, { key: value }] |
Same as above, with a final context update. |
[:error] |
Stop the chain. Caller sees :error. |
[:error, errors_payload] |
As above, with a normalized errors: array in ctx. |
For :error, the payload can be a string, a hash with :title/:detail/:code, an array of either, or already a { errors: [...] } hash.
FlowOrganizer normalizes them all to the canonical { errors: [{ title: ... }, ...] } shape.
Callables only receive the context keys their kwargs declare:
->(value:) { ... } # gets { value: ... }
->(value:, other:) { ... } # gets { value: ..., other: ... }
->(**) { ... } # gets the full ctx
->() { ... } # gets nothing
Positional arguments are not supported: this is what lets it cheaply slice the context per step.
Dual-track execution
Pass list_error: to run a compensation track if the success track fails:
FlowOrganizer.call(
list: [book_flight, charge_card, send_confirmation],
list_error: [refund_card, send_apology],
ctx: { user: user, flight: flight },
)
If charge_card returns [:error], execution jumps to refund_card with the accumulated context.
Resolvers
Anything responding to #call works directly. For more structured lists, two extras ship in the box.
Exception handling
A callable that raises is caught and turned into [:error, { error: <exception> }].
Two knobs change the default:
# Re-raise instead of catching.
FlowOrganizer.call(list: [...], ctx: { ... }, raise_exception: true)
# Forward caught exceptions to your error reporter.
FlowOrganizer.exception_reporter = ->(exception:) { Sentry.capture_exception(exception) }
Performance
The included benchmark compares plain chained lambda calls against FlowOrganizer.call
with a 5-step pipeline, 20 000 iterations:
total per-call ratio vs baseline
plain chained calls 16.05 ms 0.80 µs 1.00x (baseline)
flow_organizer (cached) 78.42 ms 3.92 µs 4.87x +388%
flow_organizer (uncached) 158.92 ms 7.95 µs 9.90x +890%
Run it locally:
ruby benchmark/flow_organizer_overhead.rb
The parameter-introspection cache is on by default and warms automatically. Call FlowOrganizer::Context.clear_parameter_cache if you reload code in development and want to drop stale Method objects.
License
MIT. See LICENSE.