Class: FiberStream::Source

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/source.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_factory, flows = []) ⇒ Source

Returns a new instance of Source.



74
75
76
77
# File 'lib/fiber_stream/source.rb', line 74

def initialize(source_factory, flows = [])
  @source_factory = source_factory
  @flows = flows
end

Class Method Details

.each(enumerable) ⇒ Object

Creates a source definition from an Enumerable.

The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.



14
15
16
# File 'lib/fiber_stream/source.rb', line 14

def self.each(enumerable)
  new(-> { Pull.each(enumerable) })
end

.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object

Creates a source definition from an IO-like object.

The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed. `chunk_size` is the maximum byte count passed to `readpartial` for one downstream pull; very large values may cause the IO implementation to attempt large allocations.

Raises:

  • (TypeError)


26
27
28
29
30
31
32
33
34
# File 'lib/fiber_stream/source.rb', line 26

def self.io(io, chunk_size: 16 * 1024, close: false)
  raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial)
  raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer)
  raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive?
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)

  new(-> { Pull.io(io, chunk_size, close) })
end

.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from multiple Ractor port pairs.

Each pair must be a Hash with ‘:port` and `:ack_port`. The source sends at most one outstanding `RactorPort::Ack` to each active producer and emits producer values in coordinator-observed ready order. Producer work is isolated in Ractors, so demanding this source does not require a `Fiber.scheduler`. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.

Raises:

  • (TypeError)


65
66
67
68
69
70
71
72
# File 'lib/fiber_stream/source.rb', line 65

def self.ractor_merge_ports(ports, ack_transfer: :copy, cancel: true)
  pairs = normalize_ractor_merge_port_pairs(ports)

  Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_merge_ports(pairs, ack_transfer, cancel) })
end

.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from Ractor ports.

‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message. Failure metadata is producer-provided and should be sanitized before crossing trust boundaries.

Raises:

  • (TypeError)


44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fiber_stream/source.rb', line 44

def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
  raise TypeError, "port must respond to receive" unless port.respond_to?(:receive)
  unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel
    raise TypeError, "ack_port must provide Ractor-style send"
  end

  Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) })
end

Instance Method Details

#asyncObject

Returns a new source definition with an asynchronous boundary.

This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.



231
232
233
# File 'lib/fiber_stream/source.rb', line 231

def async
  via(Flow.async)
end

#buffer(count) ⇒ Object

Returns a new source definition with a bounded asynchronous buffer.

This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.



240
241
242
# File 'lib/fiber_stream/source.rb', line 240

def buffer(count)
  via(Flow.buffer(count))
end

#concat(source) ⇒ Object

Returns a new source definition that emits this source, then ‘source`.

Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.

Raises:

  • (TypeError)


95
96
97
98
99
100
101
102
# File 'lib/fiber_stream/source.rb', line 95

def concat(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.__send__(
    :new,
    -> { Pull.concat(materializer, source.__send__(:materializer)) }
  )
end

#drop(count) ⇒ Object

Returns a new source definition that drops the first ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.



194
195
196
# File 'lib/fiber_stream/source.rb', line 194

def drop(count)
  via(Flow.drop(count))
end

#drop_while(&block) ⇒ Object

Returns a new source definition that drops leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.



223
224
225
# File 'lib/fiber_stream/source.rb', line 223

def drop_while(&block)
  via(Flow.drop_while(&block))
end

#grouped(count) ⇒ Object

Returns a new source definition that groups adjacent elements into arrays.

This is a convenience wrapper around ‘via(FiberStream::Flow.grouped(count))` and preserves the same validation, ordering, final partial group, and pull-driven backpressure behavior.



203
204
205
# File 'lib/fiber_stream/source.rb', line 203

def grouped(count)
  via(Flow.grouped(count))
end

#lines(chomp: true, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into lines.

This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`. With `max_length: nil`, one unterminated line can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.



251
252
253
# File 'lib/fiber_stream/source.rb', line 251

def lines(chomp: true, max_length: nil)
  via(Flow.lines(chomp: chomp, max_length: max_length))
end

#map(&block) ⇒ Object

Returns a new source definition that maps each element with ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.



142
143
144
# File 'lib/fiber_stream/source.rb', line 142

def map(&block)
  via(Flow.map(&block))
end

#merge(source) ⇒ Object

Returns a new source definition that emits values from this source and ‘source` in scheduler-observed ready order.

Construction is lazy. The merged source starts one scheduled producer fiber per input source only when downstream demand reaches the merge. Each input’s own element order is preserved, but cross-input ordering is not deterministic and requires an installed ‘Fiber.scheduler` from a non-blocking fiber when demanded.

Raises:

  • (TypeError)


128
129
130
131
132
133
134
135
# File 'lib/fiber_stream/source.rb', line 128

def merge(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.__send__(
    :new,
    -> { Pull.merge(materializer, source.__send__(:materializer)) }
  )
end

#parallel_map(concurrency:, &block) ⇒ Object

Returns a new source definition that maps elements concurrently.

This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.



152
153
154
# File 'lib/fiber_stream/source.rb', line 152

def parallel_map(concurrency:, &block)
  via(Flow.parallel_map(concurrency: concurrency, &block))
end

#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object

Returns a new source definition that maps elements in Ractor workers.

This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.



162
163
164
165
166
167
168
169
170
171
# File 'lib/fiber_stream/source.rb', line 162

def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block)
  via(
    Flow.ractor_map(
      workers: workers,
      input_transfer: input_transfer,
      output_transfer: output_transfer,
      &block
    )
  )
end

#run_with(sink) ⇒ Object

Materializes and runs this source with ‘sink`.

The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.

Raises:

  • (TypeError)


281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/fiber_stream/source.rb', line 281

def run_with(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  primary_error = nil

  begin
    stream = materialize

    sink.__send__(:run, stream)
  rescue StandardError => error
    primary_error = error
    raise
  ensure
    begin
      stream&.close
    rescue StandardError => close_error
      raise close_error unless primary_error
    end
  end
end

#select(&block) ⇒ Object

Returns a new source definition that keeps elements matching ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.



178
179
180
# File 'lib/fiber_stream/source.rb', line 178

def select(&block)
  via(Flow.select(&block))
end

#split(separator, keep_separator: false, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into frames.

This is a convenience wrapper around ‘via(FiberStream::Flow.split(separator, keep_separator:, max_length:))`. With `max_length: nil`, one unterminated frame can buffer without bound. Set a positive `max_length` for untrusted, network-facing, or otherwise unbounded streams.



262
263
264
# File 'lib/fiber_stream/source.rb', line 262

def split(separator, keep_separator: false, max_length: nil)
  via(Flow.split(separator, keep_separator: keep_separator, max_length: max_length))
end

#take(count) ⇒ Object

Returns a new source definition that emits at most ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.



186
187
188
# File 'lib/fiber_stream/source.rb', line 186

def take(count)
  via(Flow.take(count))
end

#take_while(&block) ⇒ Object

Returns a new source definition that emits leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.



213
214
215
# File 'lib/fiber_stream/source.rb', line 213

def take_while(&block)
  via(Flow.take_while(&block))
end

#to(sink) ⇒ Object

Returns a runnable pipeline from this source to ‘sink`.

Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.

Raises:

  • (TypeError)


270
271
272
273
274
# File 'lib/fiber_stream/source.rb', line 270

def to(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  Pipeline.__send__(:new, self, sink)
end

#via(flow) ⇒ Object

Returns a new source definition that passes this source through ‘flow`.

This method is lazy. It does not run the source, enumerate values, or call flow blocks.

Raises:

  • (TypeError)


83
84
85
86
87
# File 'lib/fiber_stream/source.rb', line 83

def via(flow)
  raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow)

  self.class.__send__(:new, @source_factory, @flows + [flow])
end

#zip(source) ⇒ Object

Returns a new source definition that emits pairs from this source and ‘source`.

Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.

Raises:

  • (TypeError)


111
112
113
114
115
116
117
118
# File 'lib/fiber_stream/source.rb', line 111

def zip(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.__send__(
    :new,
    -> { Pull.zip(materializer, source.__send__(:materializer)) }
  )
end