Class: FiberStream::Source
- Inherits:
-
Object
- Object
- FiberStream::Source
- Defined in:
- lib/fiber_stream/source.rb
Class Method Summary collapse
-
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
-
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
-
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
Instance Method Summary collapse
-
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
-
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
-
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
-
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
-
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
-
#initialize(source_factory, flows = []) ⇒ Source
constructor
A new instance of Source.
-
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
-
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
-
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
-
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
-
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
-
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
-
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
-
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
-
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
-
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
-
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Constructor Details
#initialize(source_factory, flows = []) ⇒ Source
Returns a new instance of Source.
50 51 52 53 |
# File 'lib/fiber_stream/source.rb', line 50 def initialize(source_factory, flows = []) @source_factory = source_factory @flows = flows end |
Class Method Details
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.
11 12 13 |
# File 'lib/fiber_stream/source.rb', line 11 def self.each(enumerable) new(-> { Pull.each(enumerable) }) end |
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed.
21 22 23 24 25 26 27 28 29 |
# File 'lib/fiber_stream/source.rb', line 21 def self.io(io, chunk_size: 16 * 1024, close: false) raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial) raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer) raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive? raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) new(-> { Pull.io(io, chunk_size, close) }) end |
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message.
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fiber_stream/source.rb', line 38 def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) raise TypeError, "port must respond to receive" unless port.respond_to?(:receive) unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel raise TypeError, "ack_port must provide Ractor-style send" end Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) }) end |
Instance Method Details
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.
181 182 183 |
# File 'lib/fiber_stream/source.rb', line 181 def async via(Flow.async) end |
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.
190 191 192 |
# File 'lib/fiber_stream/source.rb', line 190 def buffer(count) via(Flow.buffer(count)) end |
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.
71 72 73 74 75 76 77 78 |
# File 'lib/fiber_stream/source.rb', line 71 def concat(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.__send__( :new, -> { Pull.concat(materializer, source.__send__(:materializer)) } ) end |
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.
153 154 155 |
# File 'lib/fiber_stream/source.rb', line 153 def drop(count) via(Flow.drop(count)) end |
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.
173 174 175 |
# File 'lib/fiber_stream/source.rb', line 173 def drop_while(&block) via(Flow.drop_while(&block)) end |
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`.
198 199 200 |
# File 'lib/fiber_stream/source.rb', line 198 def lines(chomp: true, max_length: nil) via(Flow.lines(chomp: chomp, max_length: max_length)) end |
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.
101 102 103 |
# File 'lib/fiber_stream/source.rb', line 101 def map(&block) via(Flow.map(&block)) end |
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.
111 112 113 |
# File 'lib/fiber_stream/source.rb', line 111 def parallel_map(concurrency:, &block) via(Flow.parallel_map(concurrency: concurrency, &block)) end |
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fiber_stream/source.rb', line 121 def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) via( Flow.ractor_map( workers: workers, input_transfer: input_transfer, output_transfer: output_transfer, &block ) ) end |
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/fiber_stream/source.rb', line 217 def run_with(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) primary_error = nil begin stream = materialize sink.__send__(:run, stream) rescue StandardError => error primary_error = error raise ensure begin stream&.close rescue StandardError => close_error raise close_error unless primary_error end end end |
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
137 138 139 |
# File 'lib/fiber_stream/source.rb', line 137 def select(&block) via(Flow.select(&block)) end |
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.
145 146 147 |
# File 'lib/fiber_stream/source.rb', line 145 def take(count) via(Flow.take(count)) end |
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.
163 164 165 |
# File 'lib/fiber_stream/source.rb', line 163 def take_while(&block) via(Flow.take_while(&block)) end |
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.
206 207 208 209 210 |
# File 'lib/fiber_stream/source.rb', line 206 def to(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) Pipeline.__send__(:new, self, sink) end |
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
This method is lazy. It does not run the source, enumerate values, or call flow blocks.
59 60 61 62 63 |
# File 'lib/fiber_stream/source.rb', line 59 def via(flow) raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow) self.class.__send__(:new, @source_factory, @flows + [flow]) end |
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.
87 88 89 90 91 92 93 94 |
# File 'lib/fiber_stream/source.rb', line 87 def zip(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.__send__( :new, -> { Pull.zip(materializer, source.__send__(:materializer)) } ) end |