FiberStream

FiberStream is a Ruby library for linear stream processing with pull-based backpressure.

It builds lazy Source definitions, transforms values with Flow stages, and materializes results with Sink objects.

Gem Version

Quick Start

Please see the project documentation for more details.

require "fiber_stream"

result =
  FiberStream::Source.each([1, 2, 3, 4])
    .map { |number| number * 2 }
    .select(&:even?)
    .take(2)
    .run_with(FiberStream::Sink.to_a)

result # => [2, 4]

Status

FiberStream currently supports linear pipelines only.

Implemented capabilities:

  • in-memory, IO, FiberStream-owned Ractor producer, backpressure-aware Ractor port, and Ractor port merge sources
  • lazy source concatenation, zipping, and scheduler-backed merging
  • mapping, filtering, limiting, predicate-based limiting and dropping, fixed-prefix dropping, fixed-size grouping, line splitting, buffering, async boundaries, ordered and unordered parallel mapping, and ordered Ractor-backed mapping
  • array, first-element, fold, foreach, and IO sinks
  • reusable flow composition and runnable pipelines
  • foreground and scheduler-backed background pipeline execution
  • public RBS signatures

FiberStream intentionally keeps the public model linear: one source, an ordered chain of flows, and one sink.

Core Concepts

Sources

A Source is a lazy stream definition. It is not consumed until the source is run with a sink.

source = FiberStream::Source.each([1, 2, 3])

source.run_with(FiberStream::Sink.to_a) # => [1, 2, 3]

Sources can be concatenated without materializing the appended source until the first source completes:

result =
  FiberStream::Source.each([1, 2])
    .concat(FiberStream::Source.each([3, 4]))
    .run_with(FiberStream::Sink.to_a)

result # => [1, 2, 3, 4]

Sources can also be zipped element-by-element. The zipped source emits pairs and completes when either input source completes:

result =
  FiberStream::Source.each([1, 2, 3])
    .zip(FiberStream::Source.each(["a", "b"]))
    .run_with(FiberStream::Sink.to_a)

result # => [[1, "a"], [2, "b"]]

IO sources read chunks on demand and require a scheduler-backed non-blocking fiber. The chunk_size option is the maximum byte count passed to readpartial for one downstream pull; very large values may cause the IO implementation to attempt large allocations, so choose a bounded value appropriate for the workload:

require "async"
require "fiber_stream"

chunks =
  Async do
    File.open("input.txt", "rb") do |file|
      FiberStream::Source.io(file)
        .run_with(FiberStream::Sink.to_a)
    end
  end.wait

Owned Ractor producer sources run producer blocks in FiberStream-managed Ractors. The producer block receives a RactorProducer context and emits one value per downstream demand:

PRODUCE_VALUES =
  Ractor.shareable_proc do |producer, values|
    values.each do |value|
      break unless producer.emit(value)
    end
  end

FiberStream::Source.ractor_producer([1, 2, 3], &PRODUCE_VALUES)
  .run_with(FiberStream::Sink.to_a)
# => [1, 2, 3]

Multiple owned producer Ractors can be merged directly without a scheduler-backed Source#merge. Each producer still receives at most one outstanding ack:

PRODUCE_TAGGED_VALUES =
  Ractor.shareable_proc do |producer, tag, values|
    values.each do |value|
      break unless producer.emit([tag, value])
    end
  end

source =
  FiberStream::Source.ractor_merge_producers do |group|
    group.producer(:a, [1, 2], &PRODUCE_TAGGED_VALUES)
    group.producer(:b, [3, 4], &PRODUCE_TAGGED_VALUES)
  end

source.run_with(FiberStream::Sink.to_a)
# Example result: [[:a, 1], [:b, 3], [:a, 2], [:b, 4]]

Use the lower-level Source.ractor_port and Source.ractor_merge_ports APIs when producer Ractors are owned outside FiberStream or need custom lifecycle handling. RactorPort::Failure cause metadata is producer-provided and is surfaced on RactorPortSourceError; redact sensitive details before sending failures across trust boundaries.

Streaming HTTP response bodies that implement #each, such as async-http response bodies, can be used with Source.each without buffering the full body first. Use the HTTP client's block form or an explicit ensure close because Source.each does not own the response body:

require "async"
require "async/http/internet/instance"
require "fiber_stream"

url = "https://raw.githubusercontent.com/elastic/examples/master/" \
  "Common%20Data%20Formats/nginx_logs/nginx_logs"

status_counts = Hash.new(0)

processed =
  Sync do
    Async::HTTP::Internet.get(url) do |response|
      raise "unexpected status #{response.status}" unless response.status == 200

      FiberStream::Source.each(response.body)
        .lines(max_length: 16 * 1024)
        .map { |line| line.split.fetch(8, nil) }
        .select { |status| status&.match?(/\A\d{3}\z/) }
        .run_with(
          FiberStream::Sink.foreach do |status|
            status_counts[status] += 1
          end
        )
    end
  end

Flows

Flows transform a stream lazily. Convenience methods on Source delegate to the matching FiberStream::Flow constructors.

result =
  FiberStream::Source.each(["a\nb", "\nc"])
    .lines
    .select { |line| line != "b" }
    .map(&:upcase)
    .run_with(FiberStream::Sink.to_a)

result # => ["A", "C"]

Reusable flows can be composed with Flow#via:

normalize =
  FiberStream::Flow.map(&:strip)
    .via(FiberStream::Flow.select { |line| !line.empty? })

FiberStream::Source.each([" a ", "", " b "])
  .via(normalize)
  .run_with(FiberStream::Sink.to_a)
# => ["a", "b"]

Use parallel_map for ordered scheduler-backed mapping when each element waits on non-blocking IO. It preserves input order while allowing up to concurrency mapping operations to be in flight:

require "async"
require "fiber_stream"

def fetch_profile(user_id)
  # Example: perform scheduler-aware HTTP, database, or socket IO here.
  sleep 0.05
  { id: user_id, name: "user-#{user_id}" }
end

profiles =
  Sync do
    FiberStream::Source.each([1, 2, 3, 4])
      .parallel_map(concurrency: 4) { |user_id| fetch_profile(user_id) }
      .run_with(FiberStream::Sink.to_a)
  end

profiles.map { |profile| profile.fetch(:id) } # => [1, 2, 3, 4]

Use parallel_unordered_map when every result can be handled independently and lower head-of-line blocking matters more than input order. It still limits in-flight mapping work to concurrency, but emits values as mapping jobs finish:

require "async"
require "fiber_stream"

responses =
  Sync do
    FiberStream::Source.each(["/a", "/slow", "/b"])
      .parallel_unordered_map(concurrency: 3) { |path| fetch_path(path) }
      .run_with(FiberStream::Sink.to_a)
  end

# Results are in completion order, not necessarily input order.
responses

Use ractor_map for ordered CPU-bound mapping in Ractor workers. The mapper must be shareable, usually by creating it with Ractor.shareable_proc.

require "digest"
require "fiber_stream"

records = [
  { name: "alpha.bin", payload: +"A" * 200_000 },
  { name: "bravo.bin", payload: +"B" * 120_000 }
]

HASH_RECORD =
  Ractor.shareable_proc do |record|
    payload = record.fetch(:payload)

    {
      name: record.fetch(:name),
      bytes: payload.bytesize,
      sha256: Digest::SHA256.hexdigest(payload)
    }
  end

digests =
  FiberStream::Source.each(records)
    .ractor_map(workers: 2, input_transfer: :move, &HASH_RECORD)
    .run_with(FiberStream::Sink.to_a)

ractor_map preserves input order, limits pulled-but-unemitted work to workers, and does not require Fiber.scheduler. Use input_transfer: :move or output_transfer: :move only when the moved object will not be reused by the sender.

Sinks

A Sink consumes the stream and returns a materialized value.

FiberStream::Source.each([1, 2, 3])
  .run_with(FiberStream::Sink.fold(0) { |sum, value| sum + value })
# => 6

Use Sink.foreach when the terminal operation is a side effect and the stream values should not be accumulated:

count =
  FiberStream::Source.each(["a", "b", "c"])
    .run_with(FiberStream::Sink.foreach { |value| puts value })

count # => 3

Pipelines

Source#to(sink) creates a reusable runnable pipeline.

pipeline =
  FiberStream::Source.each([1, 2, 3])
    .map { |number| number * 2 }
    .to(FiberStream::Sink.to_a)

pipeline.run # => [2, 4, 6]

Pipeline#run_async starts a pipeline in a scheduler-backed background fiber and returns a handle:

require "async"
require "fiber_stream"

result =
  Async do
    running =
      FiberStream::Source.each([1, 2, 3])
        .map { |number| number * 2 }
        .to(FiberStream::Sink.to_a)
        .run_async

    # Foreground scheduler-managed work can continue here.

    running.wait
  end.wait

result # => [2, 4, 6]

The handle supports wait, cancel, done?, and cancel_requested?.

Backpressure

The initial runtime is pull-based. A sink asks for one element, each flow pulls only what it needs from upstream, and the source advances only when downstream demands a value.

Sink.first demonstrates sink-side early completion:

first =
  FiberStream::Source.each([1, 2, 3])
    .run_with(FiberStream::Sink.first)

first # => 1

Flow.take demonstrates flow-side early completion and closes upstream after the requested number of elements:

limited =
  FiberStream::Source.each([1, 2, 3])
    .take(2)
    .run_with(FiberStream::Sink.to_a)

limited # => [1, 2]

Flow.drop skips a fixed prefix and then passes later elements through:

tail =
  FiberStream::Source.each([1, 2, 3, 4])
    .drop(2)
    .run_with(FiberStream::Sink.to_a)

tail # => [3, 4]

Flow.grouped batches adjacent elements into arrays and emits the final partial group:

batches =
  FiberStream::Source.each([1, 2, 3, 4, 5])
    .grouped(2)
    .run_with(FiberStream::Sink.to_a)

batches # => [[1, 2], [3, 4], [5]]

Flow.scan emits the updated accumulator for each upstream element:

running_totals =
  FiberStream::Source.each([1, 2, 3, 4])
    .scan(0) { |sum, number| sum + number }
    .run_with(FiberStream::Sink.to_a)

running_totals # => [1, 3, 6, 10]

Flow.take_while emits the leading prefix while a predicate is truthy, then closes upstream at the first false or nil result:

prefix =
  FiberStream::Source.each([1, 2, 3, 1])
    .take_while { |number| number < 3 }
    .run_with(FiberStream::Sink.to_a)

prefix # => [1, 2]

Flow.drop_while skips the leading prefix while a predicate is truthy, then passes the first false or nil result and all later elements through:

tail =
  FiberStream::Source.each([1, 2, 3, 1])
    .drop_while { |number| number < 3 }
    .run_with(FiberStream::Sink.to_a)

tail # => [3, 1]

Source#concat preserves pull-driven demand across source boundaries. The appended source is not materialized while the first source can still satisfy downstream demand:

first =
  FiberStream::Source.each([1])
    .concat(FiberStream::Source.each([2]))
    .run_with(FiberStream::Sink.first)

first # => 1

Source#zip keeps input source materialization behind downstream demand. The other source is not materialized until the receiver has produced an element for a pair:

first =
  FiberStream::Source.each([1])
    .zip(FiberStream::Source.each([2]))
    .run_with(FiberStream::Sink.first)

first # => [1, 2]

Source#merge emits values from either input source in scheduler-observed ready order while preserving each input's own order:

merged =
  Sync do
    FiberStream::Source.each([1, 2])
      .merge(FiberStream::Source.each(["a", "b"]))
      .run_with(FiberStream::Sink.to_a)
  end

# Example result: [1, "a", 2, "b"]

merge does not make scheduler-unaware blocking source work non-blocking and does not provide CPU parallelism. Use producer ractors with Source.ractor_producer or Source.ractor_merge_producers when producer work needs true isolation.

Flow.buffer(count) allows bounded prefetch. Flow.async, Flow.buffer, Flow.parallel_map, Flow.parallel_unordered_map, Source.io, Source#merge, Sink.io, and Pipeline#run_async require an installed Fiber.scheduler and a non-blocking current fiber when demanded or started. FiberStream does not install a scheduler and does not depend on Async at runtime.

API Surface

Sources:

  • FiberStream::Source.each(enumerable)
  • FiberStream::Source.io(io, chunk_size: 16 * 1024, close: false)
  • FiberStream::Source.ractor_producer(*args, transfer: :copy, ack_transfer: :copy) { |producer, *args| ... }
  • FiberStream::Source.ractor_merge_producers(transfer: :copy, ack_transfer: :copy) { |group| ... }
  • FiberStream::Source.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
  • FiberStream::Source.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true)

Source convenience methods:

  • Source#via(flow)
  • Source#concat(source)
  • Source#zip(source)
  • Source#merge(source)
  • Source#map { |element| ... }
  • Source#parallel_map(concurrency:) { |element| ... }
  • Source#parallel_unordered_map(concurrency:) { |element| ... }
  • Source#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }
  • Source#select { |element| ... }
  • Source#take(count)
  • Source#drop(count)
  • Source#grouped(count)
  • Source#scan(initial) { |accumulator, element| ... }
  • Source#take_while { |element| ... }
  • Source#drop_while { |element| ... }
  • Source#async
  • Source#buffer(count)
  • Source#lines(chomp: true, max_length: nil)
  • Source#split(separator, keep_separator: false, max_length: nil)
  • Source#to(sink)
  • Source#run_with(sink)

Flows:

  • FiberStream::Flow.map { |element| ... }
  • FiberStream::Flow.parallel_map(concurrency:) { |element| ... }
  • FiberStream::Flow.parallel_unordered_map(concurrency:) { |element| ... }
  • FiberStream::Flow.ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }
  • FiberStream::Flow.select { |element| ... }
  • FiberStream::Flow.take(count)
  • FiberStream::Flow.drop(count)
  • FiberStream::Flow.grouped(count)
  • FiberStream::Flow.scan(initial) { |accumulator, element| ... }
  • FiberStream::Flow.take_while { |element| ... }
  • FiberStream::Flow.drop_while { |element| ... }
  • FiberStream::Flow.async
  • FiberStream::Flow.buffer(count)
  • FiberStream::Flow.lines(chomp: true, max_length: nil)
  • FiberStream::Flow.split(separator, keep_separator: false, max_length: nil)
  • Flow#via(flow)
  • Flow#to(sink)

lines and split default to max_length: nil, which allows one unterminated line or frame to buffer without bound. Set a positive max_length for untrusted, network-facing, or otherwise unbounded inputs.

Sinks:

  • FiberStream::Sink.to_a
  • FiberStream::Sink.first
  • FiberStream::Sink.fold(initial) { |accumulator, element| ... }
  • FiberStream::Sink.foreach { |element| ... }
  • FiberStream::Sink.io(io, close: false, flush: false)

Pipelines:

  • FiberStream::Pipeline#run
  • FiberStream::Pipeline#run_async
  • FiberStream::RunningPipeline#wait
  • FiberStream::RunningPipeline#cancel
  • FiberStream::RunningPipeline#done?
  • FiberStream::RunningPipeline#cancel_requested?

Examples

Runnable examples live under examples/.

bundle exec ruby examples/basic_pipeline.rb
bundle exec ruby examples/composable_pipeline.rb
bundle exec ruby examples/line_processing.rb
bundle exec ruby examples/file_copy.rb
bundle exec ruby examples/backpressure_buffer.rb
bundle exec ruby examples/background_execution.rb
bundle exec ruby examples/ractor_map_hashing.rb
bundle exec ruby examples/ractor_producer_sources.rb
bundle exec ruby examples/ractor_port_source.rb
bundle exec ruby examples/ractor_merge_ports_and_map.rb
bundle exec ruby examples/async_http_requests.rb
bundle exec ruby examples/async_http_streaming_body.rb

examples/backpressure_buffer.rb prints timestamped producer and consumer events so the difference between direct demand and bounded prefetch is visible.

examples/ractor_map_hashing.rb demonstrates ordered Ractor-backed hashing with a shareable mapper proc and input_transfer: :move.

examples/ractor_producer_sources.rb demonstrates high-level owned producer Ractors with Source.ractor_producer and Source.ractor_merge_producers.

examples/ractor_port_source.rb demonstrates a producer Ractor that waits for RactorPort::Ack before sending each RactorPort::Element.

examples/ractor_merge_ports_and_map.rb demonstrates CPU-bound producer Ractors merged with Source.ractor_merge_ports, followed by CPU-bound verification in ractor_map workers.

examples/async_http_requests.rb starts a local HTTP server and shows FiberStream overlapping independent HTTP request waits with parallel_map.

examples/async_http_streaming_body.rb streams a public nginx access log with async-http, feeds the response body through Source.each(response.body), and aggregates lines without storing the full body.

Benchmark scripts live under benchmarks/.

bundle exec ruby benchmarks/stream_transform.rb
bundle exec ruby benchmarks/latency_overlap.rb
bundle exec ruby benchmarks/async_io_fanout.rb
bundle exec ruby benchmarks/heavy_cpu_map.rb

Development

This project targets Ruby 4.x. The repository currently pins Ruby 4.0.5 in mise.toml.

Install dependencies:

bundle install

Run the test suite:

bundle exec rake test

Run RBS validation:

bundle exec rbs validate

Run RuboCop:

bundle exec rubocop

Run all default checks:

bundle exec rake

Build the gem:

bundle exec gem build fiber_stream.gemspec

Release uses RubyGems Trusted Publishing from the Release GitHub Actions workflow. Configure a pending trusted publisher for the fiber_stream gem with workflow filename release.yml and environment release, then publish by pushing a version tag such as v0.1.0.

Documentation

Design and planning documents live under docs/:

  • docs/product-specs/
  • docs/design-docs/
  • docs/exec-plans/
  • docs/references/