FiberStream
FiberStream is an early-stage Ruby library for linear, pull-based stream processing with backpressure.
Build a lazy Source, transform it with Flow stages, and materialize it with
a Sink.
Quick Start
require "fiber_stream"
result =
FiberStream::Source.each([1, 2, 3, 4])
.map { |number| number * 2 }
.select(&:even?)
.take(2)
.run_with(FiberStream::Sink.to_a)
result # => [2, 4]
Status
FiberStream currently supports linear pipelines only.
Implemented capabilities:
- in-memory, IO, and backpressure-aware Ractor port sources
- mapping, filtering, limiting, line splitting, buffering, async boundaries, ordered parallel mapping, and ordered Ractor-backed mapping
- array, first-element, fold, and IO sinks
- reusable flow composition and runnable pipelines
- foreground and scheduler-backed background pipeline execution
- public RBS signatures
Not yet implemented:
- graph DSLs
Core Concepts
Sources
A Source is a lazy stream definition. It is not consumed until the source is
run with a sink.
source = FiberStream::Source.each([1, 2, 3])
source.run_with(FiberStream::Sink.to_a) # => [1, 2, 3]
IO sources read chunks on demand and require a scheduler-backed non-blocking fiber:
require "async"
require "fiber_stream"
chunks =
Async do
File.open("input.txt", "rb") do |file|
FiberStream::Source.io(file)
.run_with(FiberStream::Sink.to_a)
end
end.wait
Ractor port sources connect a producer Ractor with an explicit ack handshake.
The producer creates its acknowledgment port, waits for RactorPort::Ack, and
then sends one typed message back to the FiberStream data port:
data_port = Ractor::Port.new
setup_port = Ractor::Port.new
producer =
Ractor.new(data_port, setup_port) do |outbox, setup|
ack_port = Ractor::Port.new
setup.send(ack_port)
values = [1, 2, 3].to_enum
loop do
case ack_port.receive
in FiberStream::RactorPort::Ack
begin
outbox.send(FiberStream::RactorPort::Element.new(values.next))
rescue StopIteration
outbox.send(FiberStream::RactorPort::Complete.new)
break
end
in FiberStream::RactorPort::Cancel
break
end
end
end
ack_port = setup_port.receive
FiberStream::Source.ractor_port(data_port, ack_port: ack_port)
.run_with(FiberStream::Sink.to_a)
# => [1, 2, 3]
producer.value
Flows
Flows transform a stream lazily. Convenience methods on Source delegate to
the matching FiberStream::Flow constructors.
result =
FiberStream::Source.each(["a\nb", "\nc"])
.lines
.select { |line| line != "b" }
.map(&:upcase)
.run_with(FiberStream::Sink.to_a)
result # => ["A", "C"]
Reusable flows can be composed with Flow#via:
normalize =
FiberStream::Flow.map(&:strip)
.via(FiberStream::Flow.select { |line| !line.empty? })
FiberStream::Source.each([" a ", "", " b "])
.via(normalize)
.run_with(FiberStream::Sink.to_a)
# => ["a", "b"]
Sinks
A Sink consumes the stream and returns a materialized value.
FiberStream::Source.each([1, 2, 3])
.run_with(FiberStream::Sink.fold(0) { |sum, value| sum + value })
# => 6
Pipelines
Source#to(sink) creates a reusable runnable pipeline.
pipeline =
FiberStream::Source.each([1, 2, 3])
.map { |number| number * 2 }
.to(FiberStream::Sink.to_a)
pipeline.run # => [2, 4, 6]
Pipeline#run_async starts a pipeline in a scheduler-backed background fiber
and returns a handle:
require "async"
require "fiber_stream"
result =
Async do
running =
FiberStream::Source.each([1, 2, 3])
.map { |number| number * 2 }
.to(FiberStream::Sink.to_a)
.run_async
# Foreground scheduler-managed work can continue here.
running.wait
end.wait
result # => [2, 4, 6]
The handle supports wait, cancel, done?, and cancel_requested?.
Backpressure
The initial runtime is pull-based. A sink asks for one element, each flow pulls only what it needs from upstream, and the source advances only when downstream demands a value.
Sink.first demonstrates sink-side early completion:
first =
FiberStream::Source.each([1, 2, 3])
.run_with(FiberStream::Sink.first)
first # => 1
Flow.take demonstrates flow-side early completion and closes upstream after
the requested number of elements:
limited =
FiberStream::Source.each([1, 2, 3])
.take(2)
.run_with(FiberStream::Sink.to_a)
limited # => [1, 2]
Flow.buffer(count) allows bounded prefetch. Flow.async, Flow.buffer,
Flow.parallel_map, Source.io, Sink.io, and Pipeline#run_async require an
installed Fiber.scheduler and a non-blocking current fiber when demanded or
started. FiberStream does not install a scheduler and does not depend on Async
at runtime.
API Surface
Sources:
FiberStream::Source.each(enumerable)FiberStream::Source.io(io, chunk_size: 16 * 1024, close: false)FiberStream::Source.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
Source convenience methods:
Source#via(flow)Source#map { |element| ... }Source#parallel_map(concurrency:) { |element| ... }Source#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }Source#select { |element| ... }Source#take(count)Source#asyncSource#buffer(count)Source#lines(chomp: true, max_length: nil)Source#to(sink)Source#run_with(sink)
Flows:
FiberStream::Flow.map { |element| ... }FiberStream::Flow.parallel_map(concurrency:) { |element| ... }FiberStream::Flow.ractor_map(workers:, input_transfer: :copy, output_transfer: :copy) { |element| ... }FiberStream::Flow.select { |element| ... }FiberStream::Flow.take(count)FiberStream::Flow.asyncFiberStream::Flow.buffer(count)FiberStream::Flow.lines(chomp: true, max_length: nil)Flow#via(flow)Flow#to(sink)
Sinks:
FiberStream::Sink.to_aFiberStream::Sink.firstFiberStream::Sink.fold(initial) { |accumulator, element| ... }FiberStream::Sink.io(io, close: false, flush: false)
Pipelines:
FiberStream::Pipeline#runFiberStream::Pipeline#run_asyncFiberStream::RunningPipeline#waitFiberStream::RunningPipeline#cancelFiberStream::RunningPipeline#done?FiberStream::RunningPipeline#cancel_requested?
Examples
Runnable examples live under examples/.
bundle exec ruby examples/basic_pipeline.rb
bundle exec ruby examples/composable_pipeline.rb
bundle exec ruby examples/line_processing.rb
bundle exec ruby examples/file_copy.rb
bundle exec ruby examples/backpressure_buffer.rb
bundle exec ruby examples/background_execution.rb
bundle exec ruby examples/ractor_map_hashing.rb
bundle exec ruby examples/ractor_port_source.rb
bundle exec ruby examples/async_http_requests.rb
examples/backpressure_buffer.rb prints timestamped producer and consumer
events so the difference between direct demand and bounded prefetch is visible.
examples/ractor_map_hashing.rb demonstrates ordered Ractor-backed hashing
with a shareable mapper proc and input_transfer: :move.
examples/ractor_port_source.rb demonstrates a producer Ractor that waits for
RactorPort::Ack before sending each RactorPort::Element.
examples/async_http_requests.rb starts a local HTTP server and shows
FiberStream overlapping independent HTTP request waits with parallel_map.
Benchmark scripts live under benchmarks/.
bundle exec ruby benchmarks/stream_transform.rb
bundle exec ruby benchmarks/latency_overlap.rb
bundle exec ruby benchmarks/heavy_cpu_map.rb
Development
This project targets Ruby 4.x. The repository currently pins Ruby 4.0.3 in
mise.toml.
Install dependencies:
bundle install
Run the test suite:
bundle exec rake test
Run RBS validation:
bundle exec rbs validate
Run RuboCop:
bundle exec rubocop
Run all default checks:
bundle exec rake
Build the gem:
bundle exec gem build fiber_stream.gemspec
Release uses RubyGems Trusted Publishing from the Release GitHub Actions
workflow. Configure a pending trusted publisher for the fiber_stream gem with
workflow filename release.yml and environment release, then publish by
pushing a version tag such as v0.1.0.
Documentation
Design and planning documents live under docs/:
docs/product-specs/docs/design-docs/docs/exec-plans/docs/references/