Class: FiberStream::Source
- Inherits:
-
Object
- Object
- FiberStream::Source
- Defined in:
- lib/fiber_stream/source.rb
Class Method Summary collapse
-
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
-
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
-
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
-
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
Instance Method Summary collapse
-
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
-
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
-
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
-
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
-
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
-
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
-
#initialize(source_factory, flows = []) ⇒ Source
constructor
A new instance of Source.
-
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
-
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
-
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
-
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
-
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
-
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
-
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
-
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
-
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
-
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
-
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
-
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
-
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Constructor Details
#initialize(source_factory, flows = []) ⇒ Source
Returns a new instance of Source.
74 75 76 77 |
# File 'lib/fiber_stream/source.rb', line 74 def initialize(source_factory, flows = []) @source_factory = source_factory @flows = flows end |
Class Method Details
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.
14 15 16 |
# File 'lib/fiber_stream/source.rb', line 14 def self.each(enumerable) new(-> { Pull.each(enumerable) }) end |
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed. `chunk_size` is the maximum byte count passed to `readpartial` for one downstream pull; very large values may cause the IO implementation to attempt large allocations.
26 27 28 29 30 31 32 33 34 |
# File 'lib/fiber_stream/source.rb', line 26 def self.io(io, chunk_size: 16 * 1024, close: false) raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial) raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer) raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive? raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) new(-> { Pull.io(io, chunk_size, close) }) end |
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
Each pair must be a Hash with ‘:port` and `:ack_port`. The source sends at most one outstanding `RactorPort::Ack` to each active producer and emits producer values in coordinator-observed ready order. Producer work is isolated in Ractors, so demanding this source does not require a `Fiber.scheduler`. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
65 66 67 68 69 70 71 72 |
# File 'lib/fiber_stream/source.rb', line 65 def self.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) pairs = normalize_ractor_merge_port_pairs(ports) Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_merge_ports(pairs, ack_transfer, cancel) }) end |
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fiber_stream/source.rb', line 44 def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) raise TypeError, "port must respond to receive" unless port.respond_to?(:receive) unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel raise TypeError, "ack_port must provide Ractor-style send" end Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) }) end |
Instance Method Details
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.
231 232 233 |
# File 'lib/fiber_stream/source.rb', line 231 def async via(Flow.async) end |
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.
240 241 242 |
# File 'lib/fiber_stream/source.rb', line 240 def buffer(count) via(Flow.buffer(count)) end |
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.
95 96 97 98 99 100 101 102 |
# File 'lib/fiber_stream/source.rb', line 95 def concat(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.__send__( :new, -> { Pull.concat(materializer, source.__send__(:materializer)) } ) end |
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.
194 195 196 |
# File 'lib/fiber_stream/source.rb', line 194 def drop(count) via(Flow.drop(count)) end |
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.
223 224 225 |
# File 'lib/fiber_stream/source.rb', line 223 def drop_while(&block) via(Flow.drop_while(&block)) end |
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
This is a convenience wrapper around ‘via(FiberStream::Flow.grouped(count))` and preserves the same validation, ordering, final partial group, and pull-driven backpressure behavior.
203 204 205 |
# File 'lib/fiber_stream/source.rb', line 203 def grouped(count) via(Flow.grouped(count)) end |
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`. With `max_length: nil`, one unterminated line can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
251 252 253 |
# File 'lib/fiber_stream/source.rb', line 251 def lines(chomp: true, max_length: nil) via(Flow.lines(chomp: chomp, max_length: max_length)) end |
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.
142 143 144 |
# File 'lib/fiber_stream/source.rb', line 142 def map(&block) via(Flow.map(&block)) end |
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
Construction is lazy. The merged source starts one scheduled producer fiber per input source only when downstream demand reaches the merge. Each input’s own element order is preserved, but cross-input ordering is not deterministic and requires an installed ‘Fiber.scheduler` from a non-blocking fiber when demanded.
128 129 130 131 132 133 134 135 |
# File 'lib/fiber_stream/source.rb', line 128 def merge(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.__send__( :new, -> { Pull.merge(materializer, source.__send__(:materializer)) } ) end |
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.
152 153 154 |
# File 'lib/fiber_stream/source.rb', line 152 def parallel_map(concurrency:, &block) via(Flow.parallel_map(concurrency: concurrency, &block)) end |
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.
162 163 164 165 166 167 168 169 170 171 |
# File 'lib/fiber_stream/source.rb', line 162 def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) via( Flow.ractor_map( workers: workers, input_transfer: input_transfer, output_transfer: output_transfer, &block ) ) end |
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/fiber_stream/source.rb', line 281 def run_with(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) primary_error = nil begin stream = materialize sink.__send__(:run, stream) rescue StandardError => error primary_error = error raise ensure begin stream&.close rescue StandardError => close_error raise close_error unless primary_error end end end |
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
178 179 180 |
# File 'lib/fiber_stream/source.rb', line 178 def select(&block) via(Flow.select(&block)) end |
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
This is a convenience wrapper around ‘via(FiberStream::Flow.split(separator, keep_separator:, max_length:))`. With `max_length: nil`, one unterminated frame can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
262 263 264 |
# File 'lib/fiber_stream/source.rb', line 262 def split(separator, keep_separator: false, max_length: nil) via(Flow.split(separator, keep_separator: keep_separator, max_length: max_length)) end |
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.
186 187 188 |
# File 'lib/fiber_stream/source.rb', line 186 def take(count) via(Flow.take(count)) end |
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.
213 214 215 |
# File 'lib/fiber_stream/source.rb', line 213 def take_while(&block) via(Flow.take_while(&block)) end |
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.
270 271 272 273 274 |
# File 'lib/fiber_stream/source.rb', line 270 def to(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) Pipeline.__send__(:new, self, sink) end |
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
This method is lazy. It does not run the source, enumerate values, or call flow blocks.
83 84 85 86 87 |
# File 'lib/fiber_stream/source.rb', line 83 def via(flow) raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow) self.class.__send__(:new, @source_factory, @flows + [flow]) end |
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.
111 112 113 114 115 116 117 118 |
# File 'lib/fiber_stream/source.rb', line 111 def zip(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.__send__( :new, -> { Pull.zip(materializer, source.__send__(:materializer)) } ) end |