Class: FiberStream::Source
- Inherits:
-
Object
- Object
- FiberStream::Source
- Defined in:
- lib/fiber_stream/source.rb
Class Method Summary collapse
-
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
-
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
-
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
Instance Method Summary collapse
-
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
-
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
-
#initialize(source_factory, flows = []) ⇒ Source
constructor
A new instance of Source.
-
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
-
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
-
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
-
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
-
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
-
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
-
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
-
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
-
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
Constructor Details
#initialize(source_factory, flows = []) ⇒ Source
Returns a new instance of Source.
50 51 52 53 |
# File 'lib/fiber_stream/source.rb', line 50 def initialize(source_factory, flows = []) @source_factory = source_factory @flows = flows end |
Class Method Details
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.
11 12 13 |
# File 'lib/fiber_stream/source.rb', line 11 def self.each(enumerable) new(-> { Pull.each(enumerable) }) end |
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed.
21 22 23 24 25 26 27 28 29 |
# File 'lib/fiber_stream/source.rb', line 21 def self.io(io, chunk_size: 16 * 1024, close: false) raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial) raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer) raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive? raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) new(-> { Pull.io(io, chunk_size, close) }) end |
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message.
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fiber_stream/source.rb', line 38 def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) raise TypeError, "port must respond to receive" unless port.respond_to?(:receive) unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel raise TypeError, "ack_port must provide Ractor-style send" end Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) }) end |
Instance Method Details
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.
122 123 124 |
# File 'lib/fiber_stream/source.rb', line 122 def async via(Flow.async) end |
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.
131 132 133 |
# File 'lib/fiber_stream/source.rb', line 131 def buffer(count) via(Flow.buffer(count)) end |
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`.
139 140 141 |
# File 'lib/fiber_stream/source.rb', line 139 def lines(chomp: true, max_length: nil) via(Flow.lines(chomp: chomp, max_length: max_length)) end |
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.
70 71 72 |
# File 'lib/fiber_stream/source.rb', line 70 def map(&block) via(Flow.map(&block)) end |
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.
80 81 82 |
# File 'lib/fiber_stream/source.rb', line 80 def parallel_map(concurrency:, &block) via(Flow.parallel_map(concurrency: concurrency, &block)) end |
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fiber_stream/source.rb', line 90 def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) via( Flow.ractor_map( workers: workers, input_transfer: input_transfer, output_transfer: output_transfer, &block ) ) end |
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/fiber_stream/source.rb', line 158 def run_with(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) primary_error = nil begin stream = @source_factory.call @flows.each do |flow| stream = flow.__send__(:attach, stream) end sink.__send__(:run, stream) rescue StandardError => error primary_error = error raise ensure begin stream&.close rescue StandardError => close_error raise close_error unless primary_error end end end |
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
106 107 108 |
# File 'lib/fiber_stream/source.rb', line 106 def select(&block) via(Flow.select(&block)) end |
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.
114 115 116 |
# File 'lib/fiber_stream/source.rb', line 114 def take(count) via(Flow.take(count)) end |
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.
147 148 149 150 151 |
# File 'lib/fiber_stream/source.rb', line 147 def to(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) Pipeline.__send__(:new, self, sink) end |
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
This method is lazy. It does not run the source, enumerate values, or call flow blocks.
59 60 61 62 63 |
# File 'lib/fiber_stream/source.rb', line 59 def via(flow) raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow) self.class.__send__(:new, @source_factory, @flows + [flow]) end |