Class: FiberStream::Source

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/source.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_factory, flows = []) ⇒ Source

Returns a new instance of Source.



118
119
120
121
# File 'lib/fiber_stream/source.rb', line 118

def initialize(source_factory, flows = [])
  @source_factory = source_factory
  @flows = flows
end

Class Method Details

.build(source_factory, flows = []) ⇒ Object

:nodoc:



114
115
116
# File 'lib/fiber_stream/source.rb', line 114

def self.build(source_factory, flows = []) # :nodoc:
  new(source_factory, flows)
end

.each(enumerable) ⇒ Object

Creates a source definition from an Enumerable.

The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.



14
15
16
# File 'lib/fiber_stream/source.rb', line 14

def self.each(enumerable)
  new(-> { Pull.each(enumerable) })
end

.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object

Creates a source definition from an IO-like object.

The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed. `chunk_size` is the maximum byte count passed to `readpartial` for one downstream pull; very large values may cause the IO implementation to attempt large allocations.

Raises:

  • (TypeError)


26
27
28
29
30
31
32
33
34
# File 'lib/fiber_stream/source.rb', line 26

def self.io(io, chunk_size: 16 * 1024, close: false)
  raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial)
  raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer)
  raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive?
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)

  new(-> { Pull.io(io, chunk_size, close) })
end

.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from multiple Ractor port pairs.

Each pair must be a Hash with ‘:port` and `:ack_port`. The source sends at most one outstanding `RactorPort::Ack` to each active producer and emits producer values in coordinator-observed ready order. Producer work is isolated in Ractors, so demanding this source does not require a `Fiber.scheduler`. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.

Raises:

  • (TypeError)


65
66
67
68
69
70
71
72
# File 'lib/fiber_stream/source.rb', line 65

def self.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true)
  pairs = normalize_ractor_merge_port_pairs(ports)

  Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_merge_ports(pairs, ack_transfer, cancel) })
end

.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block) ⇒ Object

Creates a source backed by multiple FiberStream-owned producer ractors.

The registration block runs at construction to collect producer definitions, but producer ractors and ports are started lazily on first downstream demand. Outputs are merged with the same ready-order semantics as ‘Source.ractor_merge_ports`.

Raises:

  • (ArgumentError)


100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fiber_stream/source.rb', line 100

def self.ractor_merge_producers(transfer: :copy, ack_transfer: :copy, &block)
  raise ArgumentError, "missing block" unless block

  Internal::RactorTransferPolicy.validate!(:transfer, transfer)
  Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer)

  group = RactorProducerGroup.new(transfer)
  block.call(group)
  definitions = group.definitions
  raise ArgumentError, "ractor_merge_producers requires at least two producers" if definitions.size < 2

  new(-> { Pull.ractor_merge_producers(definitions, ack_transfer) })
end

.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from Ractor ports.

‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.

Raises:

  • (TypeError)


44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fiber_stream/source.rb', line 44

def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
  raise TypeError, "port must respond to receive" unless port.respond_to?(:receive)
  unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel
    raise TypeError, "ack_port must provide Ractor-style send"
  end

  Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) })
end

.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block) ⇒ Object

Creates a source backed by one FiberStream-owned producer ractor.

The producer ractor is started lazily on first downstream demand. The shareable block receives a ‘RactorProducer` context and the provided arguments. Calls to the context preserve one-outstanding-ack backpressure, and cleanup always requests cooperative cancellation.

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/fiber_stream/source.rb', line 80

def self.ractor_producer(*args, transfer: :copy, ack_transfer: :copy, &block)
  raise ArgumentError, "missing block" unless block

  Internal::RactorTransferPolicy.validate!(:transfer, transfer)
  Internal::RactorTransferPolicy.validate!(:ack_transfer, ack_transfer)
  raise TypeError, "block must be shareable" unless Ractor.shareable?(block)

  group = RactorProducerGroup.new(transfer)
  group.producer(*args, &block)
  definitions = group.definitions

  new(-> { Pull.ractor_producer(definitions, ack_transfer) })
end

Instance Method Details

#asyncObject

Returns a new source definition with an asynchronous boundary.

This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.



287
288
289
# File 'lib/fiber_stream/source.rb', line 287

def async
  via(Flow.async)
end

#buffer(count) ⇒ Object

Returns a new source definition with a bounded asynchronous buffer.

This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.



296
297
298
# File 'lib/fiber_stream/source.rb', line 296

def buffer(count)
  via(Flow.buffer(count))
end

#concat(source) ⇒ Object

Returns a new source definition that emits this source, then ‘source`.

Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.

Raises:

  • (TypeError)


139
140
141
142
143
# File 'lib/fiber_stream/source.rb', line 139

def concat(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.build(-> { Pull.concat(to_pull_materializer, source.to_pull_materializer) })
end

#drop(count) ⇒ Object

Returns a new source definition that drops the first ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.



241
242
243
# File 'lib/fiber_stream/source.rb', line 241

def drop(count)
  via(Flow.drop(count))
end

#drop_while(&block) ⇒ Object

Returns a new source definition that drops leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.



279
280
281
# File 'lib/fiber_stream/source.rb', line 279

def drop_while(&block)
  via(Flow.drop_while(&block))
end

#grouped(count) ⇒ Object

Returns a new source definition that groups adjacent elements into arrays.

This is a convenience wrapper around ‘via(FiberStream::Flow.grouped(count))` and preserves the same validation, ordering, final partial group, and pull-driven backpressure behavior.



250
251
252
# File 'lib/fiber_stream/source.rb', line 250

def grouped(count)
  via(Flow.grouped(count))
end

#lines(chomp: true, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into lines.

This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`. With `max_length: nil`, one unterminated line can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.



307
308
309
# File 'lib/fiber_stream/source.rb', line 307

def lines(chomp: true, max_length: nil)
  via(Flow.lines(chomp: chomp, max_length: max_length))
end

#map(&block) ⇒ Object

Returns a new source definition that maps each element with ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.



177
178
179
# File 'lib/fiber_stream/source.rb', line 177

def map(&block)
  via(Flow.map(&block))
end

#merge(source) ⇒ Object

Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.

Construction is lazy. The merged source starts one scheduled producer fiber per input source only when downstream demand reaches the merge. Each input’s own element order is preserved, but cross-input ordering is not deterministic and requires an installed ‘Fiber.scheduler` from a non-blocking fiber when demanded.

Raises:

  • (TypeError)


166
167
168
169
170
# File 'lib/fiber_stream/source.rb', line 166

def merge(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.build(-> { Pull.merge(to_pull_materializer, source.to_pull_materializer) })
end

#parallel_map(concurrency:, &block) ⇒ Object

Returns a new source definition that maps elements concurrently.

This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.



187
188
189
# File 'lib/fiber_stream/source.rb', line 187

def parallel_map(concurrency:, &block)
  via(Flow.parallel_map(concurrency: concurrency, &block))
end

#parallel_unordered_map(concurrency:, &block) ⇒ Object

Returns a new source definition that maps elements concurrently and emits mapped values in completion order.

This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_unordered_map(concurrency:) { … })`. The operation preserves the same scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior while making no input-order guarantee.



199
200
201
# File 'lib/fiber_stream/source.rb', line 199

def parallel_unordered_map(concurrency:, &block)
  via(Flow.parallel_unordered_map(concurrency: concurrency, &block))
end

#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object

Returns a new source definition that maps elements in Ractor workers.

This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.



209
210
211
212
213
214
215
216
217
218
# File 'lib/fiber_stream/source.rb', line 209

def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block)
  via(
    Flow.ractor_map(
      workers: workers,
      input_transfer: input_transfer,
      output_transfer: output_transfer,
      &block
    )
  )
end

#run_with(sink) ⇒ Object

Materializes and runs this source with ‘sink`.

The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.

Raises:

  • (TypeError)


337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/fiber_stream/source.rb', line 337

def run_with(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  primary_error = nil

  begin
    stream = materialize

    sink.run_stream(stream)
  rescue StandardError => error
    primary_error = error
    raise
  ensure
    begin
      stream&.close
    rescue StandardError => close_error
      raise close_error unless primary_error
    end
  end
end

#scan(initial, &block) ⇒ Object

Returns a new source definition that emits running accumulators.

This is a convenience wrapper around ‘via(FiberStream::Flow.scan(initial) { … })` and preserves the same reducer order, lazy construction, and pull-driven backpressure behavior.



259
260
261
# File 'lib/fiber_stream/source.rb', line 259

def scan(initial, &block)
  via(Flow.scan(initial, &block))
end

#select(&block) ⇒ Object

Returns a new source definition that keeps elements matching ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.



225
226
227
# File 'lib/fiber_stream/source.rb', line 225

def select(&block)
  via(Flow.select(&block))
end

#split(separator, keep_separator: false, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into frames.

This is a convenience wrapper around ‘via(FiberStream::Flow.split(separator, keep_separator:, max_length:))`. With `max_length: nil`, one unterminated frame can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.



318
319
320
# File 'lib/fiber_stream/source.rb', line 318

def split(separator, keep_separator: false, max_length: nil)
  via(Flow.split(separator, keep_separator: keep_separator, max_length: max_length))
end

#take(count) ⇒ Object

Returns a new source definition that emits at most ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.



233
234
235
# File 'lib/fiber_stream/source.rb', line 233

def take(count)
  via(Flow.take(count))
end

#take_while(&block) ⇒ Object

Returns a new source definition that emits leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.



269
270
271
# File 'lib/fiber_stream/source.rb', line 269

def take_while(&block)
  via(Flow.take_while(&block))
end

#to(sink) ⇒ Object

Returns a runnable pipeline from this source to ‘sink`.

Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.

Raises:

  • (TypeError)


326
327
328
329
330
# File 'lib/fiber_stream/source.rb', line 326

def to(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  Pipeline.build(self, sink)
end

#to_pull_materializerObject

:nodoc:



360
361
362
# File 'lib/fiber_stream/source.rb', line 360

def to_pull_materializer # :nodoc:
  method(:materialize)
end

#via(flow) ⇒ Object

Returns a new source definition that passes this source through ‘flow`.

This method is lazy. It does not run the source, enumerate values, or call flow blocks.

Raises:

  • (TypeError)


127
128
129
130
131
# File 'lib/fiber_stream/source.rb', line 127

def via(flow)
  raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow)

  self.class.build(@source_factory, @flows + [flow])
end

#zip(source) ⇒ Object

Returns a new source definition that emits pairs from this source and ‘source`.

Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.

Raises:

  • (TypeError)


152
153
154
155
156
# File 'lib/fiber_stream/source.rb', line 152

def zip(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.build(-> { Pull.zip(to_pull_materializer, source.to_pull_materializer) })
end