Class: FiberStream::Source
- Inherits:
-
Object
- Object
- FiberStream::Source
- Defined in:
- lib/fiber_stream/source.rb
Class Method Summary collapse
-
.build(source_factory, flows = []) ⇒ Object
:nodoc:.
-
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
-
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
-
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
-
.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by multiple FiberStream-owned producer ractors.
-
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
-
.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by one FiberStream-owned producer ractor.
Instance Method Summary collapse
-
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
-
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
-
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
-
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
-
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
-
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
-
#initialize(source_factory, flows = []) ⇒ Source
constructor
A new instance of Source.
-
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
-
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
-
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
-
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
-
#parallel_unordered_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently and emits mapped values in completion order.
-
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
-
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
-
#scan(initial, &block) ⇒ Object
Returns a new source definition that emits running accumulators.
-
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
-
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
-
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
-
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
-
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
-
#to_pull_materializer ⇒ Object
:nodoc:.
-
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
-
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Constructor Details
#initialize(source_factory, flows = []) ⇒ Source
Returns a new instance of Source.
118 119 120 121 |
# File 'lib/fiber_stream/source.rb', line 118 def initialize(source_factory, flows = []) @source_factory = source_factory @flows = flows end |
Class Method Details
.build(source_factory, flows = []) ⇒ Object
:nodoc:
114 115 116 |
# File 'lib/fiber_stream/source.rb', line 114 def self.build(source_factory, flows = []) # :nodoc: new(source_factory, flows) end |
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.
14 15 16 |
# File 'lib/fiber_stream/source.rb', line 14 def self.each(enumerable) new(-> { Pull.each(enumerable) }) end |
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed. `chunk_size` is the maximum byte count passed to `readpartial` for one downstream pull; very large values may cause the IO implementation to attempt large allocations.
26 27 28 29 30 31 32 33 34 |
# File 'lib/fiber_stream/source.rb', line 26 def self.io(io, chunk_size: 16 * 1024, close: false) raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial) raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer) raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive? raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) new(-> { Pull.io(io, chunk_size, close) }) end |
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
Each pair must be a Hash with ‘:port` and `:ack_port`. The source sends at most one outstanding `RactorPort::Ack` to each active producer and emits producer values in coordinator-observed ready order. Producer work is isolated in Ractors, so demanding this source does not require a `Fiber.scheduler`. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
65 66 67 68 69 70 71 72 |
# File 'lib/fiber_stream/source.rb', line 65 def self.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) pairs = normalize_ractor_merge_port_pairs(ports) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_merge_ports(pairs, ack_transfer, cancel) }) end |
.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by multiple FiberStream-owned producer ractors.
The registration block runs at construction to collect producer definitions, but producer ractors and ports are started lazily on first downstream demand. Outputs are merged with the same ready-order semantics as ‘Source.ractor_merge_ports`.
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/fiber_stream/source.rb', line 100 def self.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) raise ArgumentError, "missing block" unless block Internal::RactorTransferPolicy.validate!(:transfer, transfer) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) group = RactorProducerGroup.new(transfer) block.call(group) definitions = group.definitions raise ArgumentError, "ractor_merge_producers requires at least two producers" if definitions.size < 2 new(-> { Pull.ractor_merge_producers(definitions, ack_transfer) }) end |
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fiber_stream/source.rb', line 44 def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) raise TypeError, "port must respond to receive" unless port.respond_to?(:receive) unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel raise TypeError, "ack_port must provide Ractor-style send" end Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) }) end |
.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by one FiberStream-owned producer ractor.
The producer ractor is started lazily on first downstream demand. The shareable block receives a ‘RactorProducer` context and the provided arguments. Calls to the context preserve one-outstanding-ack backpressure, and cleanup always requests cooperative cancellation.
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/fiber_stream/source.rb', line 80 def self.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) raise ArgumentError, "missing block" unless block Internal::RactorTransferPolicy.validate!(:transfer, transfer) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "block must be shareable" unless Ractor.shareable?(block) group = RactorProducerGroup.new(transfer) group.producer(*args, &block) definitions = group.definitions new(-> { Pull.ractor_producer(definitions, ack_transfer) }) end |
Instance Method Details
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.
287 288 289 |
# File 'lib/fiber_stream/source.rb', line 287 def async via(Flow.async) end |
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.
296 297 298 |
# File 'lib/fiber_stream/source.rb', line 296 def buffer(count) via(Flow.buffer(count)) end |
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.
139 140 141 142 143 |
# File 'lib/fiber_stream/source.rb', line 139 def concat(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.concat(to_pull_materializer, source.to_pull_materializer) }) end |
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.
241 242 243 |
# File 'lib/fiber_stream/source.rb', line 241 def drop(count) via(Flow.drop(count)) end |
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.
279 280 281 |
# File 'lib/fiber_stream/source.rb', line 279 def drop_while(&block) via(Flow.drop_while(&block)) end |
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
This is a convenience wrapper around ‘via(FiberStream::Flow.grouped(count))` and preserves the same validation, ordering, final partial group, and pull-driven backpressure behavior.
250 251 252 |
# File 'lib/fiber_stream/source.rb', line 250 def grouped(count) via(Flow.grouped(count)) end |
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`. With `max_length: nil`, one unterminated line can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
307 308 309 |
# File 'lib/fiber_stream/source.rb', line 307 def lines(chomp: true, max_length: nil) via(Flow.lines(chomp: chomp, max_length: max_length)) end |
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.
177 178 179 |
# File 'lib/fiber_stream/source.rb', line 177 def map(&block) via(Flow.map(&block)) end |
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
Construction is lazy. The merged source starts one scheduled producer fiber per input source only when downstream demand reaches the merge. Each input’s own element order is preserved, but cross-input ordering is not deterministic and requires an installed ‘Fiber.scheduler` from a non-blocking fiber when demanded.
166 167 168 169 170 |
# File 'lib/fiber_stream/source.rb', line 166 def merge(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.merge(to_pull_materializer, source.to_pull_materializer) }) end |
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.
187 188 189 |
# File 'lib/fiber_stream/source.rb', line 187 def parallel_map(concurrency:, &block) via(Flow.parallel_map(concurrency: concurrency, &block)) end |
#parallel_unordered_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently and emits mapped values in completion order.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_unordered_map(concurrency:) { … })`. The operation preserves the same scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior while making no input-order guarantee.
199 200 201 |
# File 'lib/fiber_stream/source.rb', line 199 def parallel_unordered_map(concurrency:, &block) via(Flow.parallel_unordered_map(concurrency: concurrency, &block)) end |
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/fiber_stream/source.rb', line 209 def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) via( Flow.ractor_map( workers: workers, input_transfer: input_transfer, output_transfer: output_transfer, &block ) ) end |
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/fiber_stream/source.rb', line 337 def run_with(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) primary_error = nil begin stream = materialize sink.run_stream(stream) rescue StandardError => error primary_error = error raise ensure begin stream&.close rescue StandardError => close_error raise close_error unless primary_error end end end |
#scan(initial, &block) ⇒ Object
Returns a new source definition that emits running accumulators.
This is a convenience wrapper around ‘via(FiberStream::Flow.scan(initial) { … })` and preserves the same reducer order, lazy construction, and pull-driven backpressure behavior.
259 260 261 |
# File 'lib/fiber_stream/source.rb', line 259 def scan(initial, &block) via(Flow.scan(initial, &block)) end |
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
225 226 227 |
# File 'lib/fiber_stream/source.rb', line 225 def select(&block) via(Flow.select(&block)) end |
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
This is a convenience wrapper around ‘via(FiberStream::Flow.split(separator, keep_separator:, max_length:))`. With `max_length: nil`, one unterminated frame can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
318 319 320 |
# File 'lib/fiber_stream/source.rb', line 318 def split(separator, keep_separator: false, max_length: nil) via(Flow.split(separator, keep_separator: keep_separator, max_length: max_length)) end |
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.
233 234 235 |
# File 'lib/fiber_stream/source.rb', line 233 def take(count) via(Flow.take(count)) end |
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.
269 270 271 |
# File 'lib/fiber_stream/source.rb', line 269 def take_while(&block) via(Flow.take_while(&block)) end |
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.
326 327 328 329 330 |
# File 'lib/fiber_stream/source.rb', line 326 def to(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) Pipeline.build(self, sink) end |
#to_pull_materializer ⇒ Object
:nodoc:
360 361 362 |
# File 'lib/fiber_stream/source.rb', line 360 def to_pull_materializer # :nodoc: method(:materialize) end |
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
This method is lazy. It does not run the source, enumerate values, or call flow blocks.
127 128 129 130 131 |
# File 'lib/fiber_stream/source.rb', line 127 def via(flow) raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow) self.class.build(@source_factory, @flows + [flow]) end |
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.
152 153 154 155 156 |
# File 'lib/fiber_stream/source.rb', line 152 def zip(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.zip(to_pull_materializer, source.to_pull_materializer) }) end |