FiberStream

FiberStream is an early-stage Ruby library for linear, pull-based stream processing with backpressure.

Build a lazy Source, transform it with Flow stages, and materialize it with a Sink.

Quick Start

require "fiber_stream"

result =
  FiberStream::Source.each([1, 2, 3, 4])
    .map { |number| number * 2 }
    .select(&:even?)
    .take(2)
    .run_with(FiberStream::Sink.to_a)

result # => [2, 4]

Status

FiberStream currently supports linear pipelines only.

Implemented capabilities:

  • in-memory, IO, and backpressure-aware Ractor port sources
  • mapping, filtering, limiting, line splitting, buffering, async boundaries, ordered parallel mapping, and ordered Ractor-backed mapping
  • array, first-element, fold, and IO sinks
  • reusable flow composition and runnable pipelines
  • foreground and scheduler-backed background pipeline execution
  • public RBS signatures

Not yet implemented:

  • graph DSLs

Core Concepts

Sources

A Source is a lazy stream definition. It is not consumed until the source is run with a sink.

source = FiberStream::Source.each([1, 2, 3])

source.run_with(FiberStream::Sink.to_a) # => [1, 2, 3]

IO sources read chunks on demand and require a scheduler-backed non-blocking fiber:

require "async"
require "fiber_stream"

chunks =
  Async do
    File.open("input.txt", "rb") do |file|
      FiberStream::Source.io(file)
        .run_with(FiberStream::Sink.to_a)
    end
  end.wait

Ractor port sources connect a producer Ractor with an explicit ack handshake. The producer creates its acknowledgment port, waits for RactorPort::Ack, and then sends one typed message back to the FiberStream data port:

data_port = Ractor::Port.new
setup_port = Ractor::Port.new

producer =
  Ractor.new(data_port, setup_port) do |outbox, setup|
    ack_port = Ractor::Port.new
    setup.send(ack_port)

    values = [1, 2, 3].to_enum

    loop do
      case ack_port.receive
      in FiberStream::RactorPort::Ack
        begin
          outbox.send(FiberStream::RactorPort::Element.new(values.next))
        rescue StopIteration
          outbox.send(FiberStream::RactorPort::Complete.new)
          break
        end
      in FiberStream::RactorPort::Cancel
        break
      end
    end
  end

ack_port = setup_port.receive

FiberStream::Source.ractor_port(data_port, ack_port: ack_port)
  .run_with(FiberStream::Sink.to_a)
# => [1, 2, 3]

producer.value

Flows

Flows transform a stream lazily. Convenience methods on Source delegate to the matching FiberStream::Flow constructors.

result =
  FiberStream::Source.each(["a\nb", "\nc"])
    .lines
    .select { |line| line != "b" }
    .map(&:upcase)
    .run_with(FiberStream::Sink.to_a)

result # => ["A", "C"]

Reusable flows can be composed with Flow#via:

normalize =
  FiberStream::Flow.map(&:strip)
    .via(FiberStream::Flow.select { |line| !line.empty? })

FiberStream::Source.each([" a ", "", " b "])
  .via(normalize)
  .run_with(FiberStream::Sink.to_a)
# => ["a", "b"]

Sinks

A Sink consumes the stream and returns a materialized value.

FiberStream::Source.each([1, 2, 3])
  .run_with(FiberStream::Sink.fold(0) { |sum, value| sum + value })
# => 6

Pipelines

Source#to(sink) creates a reusable runnable pipeline.

pipeline =
  FiberStream::Source.each([1, 2, 3])
    .map { |number| number * 2 }
    .to(FiberStream::Sink.to_a)

pipeline.run # => [2, 4, 6]

Pipeline#run_async starts a pipeline in a scheduler-backed background fiber and returns a handle:

require "async"
require "fiber_stream"

result =
  Async do
    running =
      FiberStream::Source.each([1, 2, 3])
        .map { |number| number * 2 }
        .to(FiberStream::Sink.to_a)
        .run_async

    # Foreground scheduler-managed work can continue here.

    running.wait
  end.wait

result # => [2, 4, 6]

The handle supports wait, cancel, done?, and cancel_requested?.

Backpressure

The initial runtime is pull-based. A sink asks for one element, each flow pulls only what it needs from upstream, and the source advances only when downstream demands a value.

Sink.first demonstrates sink-side early completion:

first =
  FiberStream::Source.each([1, 2, 3])
    .run_with(FiberStream::Sink.first)

first # => 1

Flow.take demonstrates flow-side early completion and closes upstream after the requested number of elements:

limited =
  FiberStream::Source.each([1, 2, 3])
    .take(2)
    .run_with(FiberStream::Sink.to_a)

limited # => [1, 2]

Flow.buffer(count) allows bounded prefetch. Flow.async, Flow.buffer, Flow.parallel_map, Source.io, Sink.io, and Pipeline#run_async require an installed Fiber.scheduler and a non-blocking current fiber when demanded or started. FiberStream does not install a scheduler and does not depend on Async at runtime.

API Surface

Sources:

  • FiberStream::Source.each(enumerable)
  • FiberStream::Source.io(io, chunk_size: 16 * 1024, close: false)
  • FiberStream::Source.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)

Source convenience methods:

  • Source#via(flow)
  • Source#map { |element| ... }
  • Source#parallel_map(concurrency:) { |element| ... }
  • Source#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }
  • Source#select { |element| ... }
  • Source#take(count)
  • Source#async
  • Source#buffer(count)
  • Source#lines(chomp: true, max_length: nil)
  • Source#to(sink)
  • Source#run_with(sink)

Flows:

  • FiberStream::Flow.map { |element| ... }
  • FiberStream::Flow.parallel_map(concurrency:) { |element| ... }
  • FiberStream::Flow.ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }
  • FiberStream::Flow.select { |element| ... }
  • FiberStream::Flow.take(count)
  • FiberStream::Flow.async
  • FiberStream::Flow.buffer(count)
  • FiberStream::Flow.lines(chomp: true, max_length: nil)
  • Flow#via(flow)
  • Flow#to(sink)

Sinks:

  • FiberStream::Sink.to_a
  • FiberStream::Sink.first
  • FiberStream::Sink.fold(initial) { |accumulator, element| ... }
  • FiberStream::Sink.io(io, close: false, flush: false)

Pipelines:

  • FiberStream::Pipeline#run
  • FiberStream::Pipeline#run_async
  • FiberStream::RunningPipeline#wait
  • FiberStream::RunningPipeline#cancel
  • FiberStream::RunningPipeline#done?
  • FiberStream::RunningPipeline#cancel_requested?

Examples

Runnable examples live under examples/.

bundle exec ruby examples/basic_pipeline.rb
bundle exec ruby examples/composable_pipeline.rb
bundle exec ruby examples/line_processing.rb
bundle exec ruby examples/file_copy.rb
bundle exec ruby examples/backpressure_buffer.rb
bundle exec ruby examples/background_execution.rb
bundle exec ruby examples/ractor_map_hashing.rb
bundle exec ruby examples/ractor_port_source.rb
bundle exec ruby examples/async_http_requests.rb

examples/backpressure_buffer.rb prints timestamped producer and consumer events so the difference between direct demand and bounded prefetch is visible.

examples/ractor_map_hashing.rb demonstrates ordered Ractor-backed hashing with a shareable mapper proc and input_transfer: :move.

examples/ractor_port_source.rb demonstrates a producer Ractor that waits for RactorPort::Ack before sending each RactorPort::Element.

examples/async_http_requests.rb starts a local HTTP server and shows FiberStream overlapping independent HTTP request waits with parallel_map.

Benchmark scripts live under benchmarks/.

bundle exec ruby benchmarks/stream_transform.rb
bundle exec ruby benchmarks/latency_overlap.rb
bundle exec ruby benchmarks/heavy_cpu_map.rb

Development

This project targets Ruby 4.x. The repository currently pins Ruby 4.0.3 in mise.toml.

Install dependencies:

bundle install

Run the test suite:

bundle exec rake test

Run RBS validation:

bundle exec rbs validate

Run RuboCop:

bundle exec rubocop

Run all default checks:

bundle exec rake

Build the gem:

bundle exec gem build fiber_stream.gemspec

Release uses RubyGems Trusted Publishing from the Release GitHub Actions workflow. Configure a pending trusted publisher for the fiber_stream gem with workflow filename release.yml and environment release, then publish by pushing a version tag such as v0.1.0.

Documentation

Design and planning documents live under docs/:

  • docs/product-specs/
  • docs/design-docs/
  • docs/exec-plans/
  • docs/references/