Class: FiberStream::Source
- Inherits:
-
Object
- Object
- FiberStream::Source
- Defined in:
- lib/fiber_stream/source.rb
Class Method Summary collapse
-
.build(source_factory, flows = []) ⇒ Object
:nodoc:.
-
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
-
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
-
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
-
.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by multiple FiberStream-owned producer ractors.
-
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
-
.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by one FiberStream-owned producer ractor.
Instance Method Summary collapse
-
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
-
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
-
#compact ⇒ Object
Returns a new source definition that drops nil elements.
-
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
-
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
-
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
-
#filter_map(&block) ⇒ Object
Returns a new source definition that emits truthy transformed values.
-
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
-
#initialize(source_factory, flows = []) ⇒ Source
constructor
A new instance of Source.
-
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
-
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
-
#map_concat(&block) ⇒ Object
Returns a new source definition that emits each mapped expansion.
-
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
-
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
-
#parallel_unordered_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently and emits mapped values in completion order.
-
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
-
#reject(&block) ⇒ Object
Returns a new source definition that drops elements matching ‘block`.
-
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
-
#scan(initial, &block) ⇒ Object
Returns a new source definition that emits running accumulators.
-
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
-
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
-
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
-
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
-
#throttle(**options) ⇒ Object
Returns a new source definition that rate-limits emitted elements.
-
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
-
#to_pull_materializer ⇒ Object
:nodoc:.
-
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
-
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Constructor Details
#initialize(source_factory, flows = []) ⇒ Source
Returns a new instance of Source.
118 119 120 121 |
# File 'lib/fiber_stream/source.rb', line 118 def initialize(source_factory, flows = []) @source_factory = source_factory @flows = flows end |
Class Method Details
.build(source_factory, flows = []) ⇒ Object
:nodoc:
114 115 116 |
# File 'lib/fiber_stream/source.rb', line 114 def self.build(source_factory, flows = []) # :nodoc: new(source_factory, flows) end |
.each(enumerable) ⇒ Object
Creates a source definition from an Enumerable.
The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.
14 15 16 |
# File 'lib/fiber_stream/source.rb', line 14 def self.each(enumerable) new(-> { Pull.each(enumerable) }) end |
.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object
Creates a source definition from an IO-like object.
The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed. `chunk_size` is the maximum byte count passed to `readpartial` for one downstream pull; very large values may cause the IO implementation to attempt large allocations.
26 27 28 29 30 31 32 33 34 |
# File 'lib/fiber_stream/source.rb', line 26 def self.io(io, chunk_size: 16 * 1024, close: false) raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial) raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer) raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive? raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) new(-> { Pull.io(io, chunk_size, close) }) end |
.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from multiple Ractor port pairs.
Each pair must be a Hash with ‘:port` and `:ack_port`. The source sends at most one outstanding `RactorPort::Ack` to each active producer and emits producer values in coordinator-observed ready order. Producer work is isolated in Ractors, so demanding this source does not require a `Fiber.scheduler`. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
65 66 67 68 69 70 71 72 |
# File 'lib/fiber_stream/source.rb', line 65 def self.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) pairs = normalize_ractor_merge_port_pairs(ports) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_merge_ports(pairs, ack_transfer, cancel) }) end |
.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by multiple FiberStream-owned producer ractors.
The registration block runs at construction to collect producer definitions, but producer ractors and ports are started lazily on first downstream demand. Outputs are merged with the same ready-order semantics as ‘Source.ractor_merge_ports`.
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/fiber_stream/source.rb', line 100 def self.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) raise ArgumentError, "missing block" unless block Internal::RactorTransferPolicy.validate!(:transfer, transfer) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) group = RactorProducerGroup.new(transfer) block.call(group) definitions = group.definitions raise ArgumentError, "ractor_merge_producers requires at least two producers" if definitions.size < 2 new(-> { Pull.ractor_merge_producers(definitions, ack_transfer) }) end |
.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object
Creates a backpressure-aware source definition from Ractor ports.
‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fiber_stream/source.rb', line 44 def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) raise TypeError, "port must respond to receive" unless port.respond_to?(:receive) unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel raise TypeError, "ack_port must provide Ractor-style send" end Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel) new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) }) end |
.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) ⇒ Object
Creates a source backed by one FiberStream-owned producer ractor.
The producer ractor is started lazily on first downstream demand. The shareable block receives a ‘RactorProducer` context and the provided arguments. Calls to the context preserve one-outstanding-ack backpressure, and cleanup always requests cooperative cancellation.
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/fiber_stream/source.rb', line 80 def self.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) raise ArgumentError, "missing block" unless block Internal::RactorTransferPolicy.validate!(:transfer, transfer) Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer) raise TypeError, "block must be shareable" unless Ractor.shareable?(block) group = RactorProducerGroup.new(transfer) group.producer(*args, &block) definitions = group.definitions new(-> { Pull.ractor_producer(definitions, ack_transfer) }) end |
Instance Method Details
#async ⇒ Object
Returns a new source definition with an asynchronous boundary.
This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.
325 326 327 |
# File 'lib/fiber_stream/source.rb', line 325 def async via(Flow.async) end |
#buffer(count) ⇒ Object
Returns a new source definition with a bounded asynchronous buffer.
This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.
334 335 336 |
# File 'lib/fiber_stream/source.rb', line 334 def buffer(count) via(Flow.buffer(count)) end |
#compact ⇒ Object
Returns a new source definition that drops nil elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.compact)` and preserves the same nil-only filtering, lazy construction, and backpressure behavior as the underlying flow.
196 197 198 |
# File 'lib/fiber_stream/source.rb', line 196 def compact via(Flow.compact) end |
#concat(source) ⇒ Object
Returns a new source definition that emits this source, then ‘source`.
Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.
139 140 141 142 143 |
# File 'lib/fiber_stream/source.rb', line 139 def concat(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.concat(to_pull_materializer, source.to_pull_materializer) }) end |
#drop(count) ⇒ Object
Returns a new source definition that drops the first ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.
279 280 281 |
# File 'lib/fiber_stream/source.rb', line 279 def drop(count) via(Flow.drop(count)) end |
#drop_while(&block) ⇒ Object
Returns a new source definition that drops leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.
317 318 319 |
# File 'lib/fiber_stream/source.rb', line 317 def drop_while(&block) via(Flow.drop_while(&block)) end |
#filter_map(&block) ⇒ Object
Returns a new source definition that emits truthy transformed values.
This is a convenience wrapper around ‘via(FiberStream::Flow.filter_map { … })` and has the same falsey-drop, lazy construction, error, and backpressure behavior as the underlying flow.
187 188 189 |
# File 'lib/fiber_stream/source.rb', line 187 def filter_map(&block) via(Flow.filter_map(&block)) end |
#grouped(count) ⇒ Object
Returns a new source definition that groups adjacent elements into arrays.
This is a convenience wrapper around ‘via(FiberStream::Flow.grouped(count))` and preserves the same validation, ordering, final partial group, and pull-driven backpressure behavior.
288 289 290 |
# File 'lib/fiber_stream/source.rb', line 288 def grouped(count) via(Flow.grouped(count)) end |
#lines(chomp: true, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into lines.
This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`. With `max_length: nil`, one unterminated line can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
354 355 356 |
# File 'lib/fiber_stream/source.rb', line 354 def lines(chomp: true, max_length: nil) via(Flow.lines(chomp: chomp, max_length: max_length)) end |
#map(&block) ⇒ Object
Returns a new source definition that maps each element with ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.
177 178 179 |
# File 'lib/fiber_stream/source.rb', line 177 def map(&block) via(Flow.map(&block)) end |
#map_concat(&block) ⇒ Object
Returns a new source definition that emits each mapped expansion.
This is a convenience wrapper around ‘via(FiberStream::Flow.map_concat { … })` and has the same one-level flattening, lazy construction, error, and backpressure behavior as the underlying flow.
206 207 208 |
# File 'lib/fiber_stream/source.rb', line 206 def map_concat(&block) via(Flow.map_concat(&block)) end |
#merge(source) ⇒ Object
Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.
Construction is lazy. The merged source starts one scheduled producer fiber per input source only when downstream demand reaches the merge. Each input’s own element order is preserved, but cross-input ordering is not deterministic and requires an installed ‘Fiber.scheduler` from a non-blocking fiber when demanded.
166 167 168 169 170 |
# File 'lib/fiber_stream/source.rb', line 166 def merge(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.merge(to_pull_materializer, source.to_pull_materializer) }) end |
#parallel_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.
216 217 218 |
# File 'lib/fiber_stream/source.rb', line 216 def parallel_map(concurrency:, &block) via(Flow.parallel_map(concurrency: concurrency, &block)) end |
#parallel_unordered_map(concurrency:, &block) ⇒ Object
Returns a new source definition that maps elements concurrently and emits mapped values in completion order.
This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_unordered_map(concurrency:) { … })`. The operation preserves the same scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior while making no input-order guarantee.
228 229 230 |
# File 'lib/fiber_stream/source.rb', line 228 def parallel_unordered_map(concurrency:, &block) via(Flow.parallel_unordered_map(concurrency: concurrency, &block)) end |
#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object
Returns a new source definition that maps elements in Ractor workers.
This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.
238 239 240 241 242 243 244 245 246 247 |
# File 'lib/fiber_stream/source.rb', line 238 def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) via( Flow.ractor_map( workers: workers, input_transfer: input_transfer, output_transfer: output_transfer, &block ) ) end |
#reject(&block) ⇒ Object
Returns a new source definition that drops elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.reject { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
263 264 265 |
# File 'lib/fiber_stream/source.rb', line 263 def reject(&block) via(Flow.reject(&block)) end |
#run_with(sink) ⇒ Object
Materializes and runs this source with ‘sink`.
The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/fiber_stream/source.rb', line 384 def run_with(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) primary_error = nil begin stream = materialize sink.run_stream(stream) rescue StandardError => error primary_error = error raise ensure begin stream&.close rescue StandardError => close_error raise close_error unless primary_error end end end |
#scan(initial, &block) ⇒ Object
Returns a new source definition that emits running accumulators.
This is a convenience wrapper around ‘via(FiberStream::Flow.scan(initial) { … })` and preserves the same reducer order, lazy construction, and pull-driven backpressure behavior.
297 298 299 |
# File 'lib/fiber_stream/source.rb', line 297 def scan(initial, &block) via(Flow.scan(initial, &block)) end |
#select(&block) ⇒ Object
Returns a new source definition that keeps elements matching ‘block`.
This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.
254 255 256 |
# File 'lib/fiber_stream/source.rb', line 254 def select(&block) via(Flow.select(&block)) end |
#split(separator, keep_separator: false, max_length: nil) ⇒ Object
Returns a new source definition that splits String chunks into frames.
This is a convenience wrapper around ‘via(FiberStream::Flow.split(separator, keep_separator:, max_length:))`. With `max_length: nil`, one unterminated frame can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.
365 366 367 |
# File 'lib/fiber_stream/source.rb', line 365 def split(separator, keep_separator: false, max_length: nil) via(Flow.split(separator, keep_separator: keep_separator, max_length: max_length)) end |
#take(count) ⇒ Object
Returns a new source definition that emits at most ‘count` elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.
271 272 273 |
# File 'lib/fiber_stream/source.rb', line 271 def take(count) via(Flow.take(count)) end |
#take_while(&block) ⇒ Object
Returns a new source definition that emits leading elements while ‘block` is truthy.
This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.
307 308 309 |
# File 'lib/fiber_stream/source.rb', line 307 def take_while(&block) via(Flow.take_while(&block)) end |
#throttle(**options) ⇒ Object
Returns a new source definition that rate-limits emitted elements.
This is a convenience wrapper around ‘via(FiberStream::Flow.throttle(…))`. The `rate:` form creates a fresh default limiter for each materialization; pass `limiter:` to share quota state across sources or runs.
343 344 345 |
# File 'lib/fiber_stream/source.rb', line 343 def throttle(**) via(Flow.throttle(**)) end |
#to(sink) ⇒ Object
Returns a runnable pipeline from this source to ‘sink`.
Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.
373 374 375 376 377 |
# File 'lib/fiber_stream/source.rb', line 373 def to(sink) raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink) Pipeline.build(self, sink) end |
#to_pull_materializer ⇒ Object
:nodoc:
407 408 409 |
# File 'lib/fiber_stream/source.rb', line 407 def to_pull_materializer # :nodoc: method(:materialize) end |
#via(flow) ⇒ Object
Returns a new source definition that passes this source through ‘flow`.
This method is lazy. It does not run the source, enumerate values, or call flow blocks.
127 128 129 130 131 |
# File 'lib/fiber_stream/source.rb', line 127 def via(flow) raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow) self.class.build(@source_factory, @flows + [flow]) end |
#zip(source) ⇒ Object
Returns a new source definition that emits pairs from this source and ‘source`.
Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.
152 153 154 155 156 |
# File 'lib/fiber_stream/source.rb', line 152 def zip(source) raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source) self.class.build(-> { Pull.zip(to_pull_materializer, source.to_pull_materializer) }) end |