Class: FiberStream::Source

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/source.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_factory, flows = []) ⇒ Source

Returns a new instance of Source.



50
51
52
53
# File 'lib/fiber_stream/source.rb', line 50

def initialize(source_factory, flows = [])
  @source_factory = source_factory
  @flows = flows
end

Class Method Details

.each(enumerable) ⇒ Object

Creates a source definition from an Enumerable.

The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.



11
12
13
# File 'lib/fiber_stream/source.rb', line 11

def self.each(enumerable)
  new(-> { Pull.each(enumerable) })
end

.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object

Creates a source definition from an IO-like object.

The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed.

Raises:

  • (TypeError)


21
22
23
24
25
26
27
28
29
# File 'lib/fiber_stream/source.rb', line 21

def self.io(io, chunk_size: 16 * 1024, close: false)
  raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial)
  raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer)
  raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive?
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)

  new(-> { Pull.io(io, chunk_size, close) })
end

.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from Ractor ports.

‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message.

Raises:

  • (TypeError)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fiber_stream/source.rb', line 38

def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
  raise TypeError, "port must respond to receive" unless port.respond_to?(:receive)
  unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel
    raise TypeError, "ack_port must provide Ractor-style send"
  end

  Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) })
end

Instance Method Details

#asyncObject

Returns a new source definition with an asynchronous boundary.

This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.



181
182
183
# File 'lib/fiber_stream/source.rb', line 181

def async
  via(Flow.async)
end

#buffer(count) ⇒ Object

Returns a new source definition with a bounded asynchronous buffer.

This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.



190
191
192
# File 'lib/fiber_stream/source.rb', line 190

def buffer(count)
  via(Flow.buffer(count))
end

#concat(source) ⇒ Object

Returns a new source definition that emits this source, then ‘source`.

Construction is lazy. The appended source is not materialized or pulled until downstream demand observes completion from this source. Flows attached before concat stay scoped to their source; flows attached after concat apply to the combined output.

Raises:

  • (TypeError)


71
72
73
74
75
76
77
78
# File 'lib/fiber_stream/source.rb', line 71

def concat(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.__send__(
    :new,
    -> { Pull.concat(materializer, source.__send__(:materializer)) }
  )
end

#drop(count) ⇒ Object

Returns a new source definition that drops the first ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop(count))` and preserves the same validation and pull-driven backpressure behavior.



153
154
155
# File 'lib/fiber_stream/source.rb', line 153

def drop(count)
  via(Flow.drop(count))
end

#drop_while(&block) ⇒ Object

Returns a new source definition that drops leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.drop_while { … })` and preserves the same predicate truthiness, prefix-dropping, and pass-through behavior.



173
174
175
# File 'lib/fiber_stream/source.rb', line 173

def drop_while(&block)
  via(Flow.drop_while(&block))
end

#lines(chomp: true, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into lines.

This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`.



198
199
200
# File 'lib/fiber_stream/source.rb', line 198

def lines(chomp: true, max_length: nil)
  via(Flow.lines(chomp: chomp, max_length: max_length))
end

#map(&block) ⇒ Object

Returns a new source definition that maps each element with ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.



101
102
103
# File 'lib/fiber_stream/source.rb', line 101

def map(&block)
  via(Flow.map(&block))
end

#parallel_map(concurrency:, &block) ⇒ Object

Returns a new source definition that maps elements concurrently.

This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.



111
112
113
# File 'lib/fiber_stream/source.rb', line 111

def parallel_map(concurrency:, &block)
  via(Flow.parallel_map(concurrency: concurrency, &block))
end

#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object

Returns a new source definition that maps elements in Ractor workers.

This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.



121
122
123
124
125
126
127
128
129
130
# File 'lib/fiber_stream/source.rb', line 121

def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block)
  via(
    Flow.ractor_map(
      workers: workers,
      input_transfer: input_transfer,
      output_transfer: output_transfer,
      &block
    )
  )
end

#run_with(sink) ⇒ Object

Materializes and runs this source with ‘sink`.

The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.

Raises:

  • (TypeError)


217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/fiber_stream/source.rb', line 217

def run_with(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  primary_error = nil

  begin
    stream = materialize

    sink.__send__(:run, stream)
  rescue StandardError => error
    primary_error = error
    raise
  ensure
    begin
      stream&.close
    rescue StandardError => close_error
      raise close_error unless primary_error
    end
  end
end

#select(&block) ⇒ Object

Returns a new source definition that keeps elements matching ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.



137
138
139
# File 'lib/fiber_stream/source.rb', line 137

def select(&block)
  via(Flow.select(&block))
end

#take(count) ⇒ Object

Returns a new source definition that emits at most ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.



145
146
147
# File 'lib/fiber_stream/source.rb', line 145

def take(count)
  via(Flow.take(count))
end

#take_while(&block) ⇒ Object

Returns a new source definition that emits leading elements while ‘block` is truthy.

This is a convenience wrapper around ‘via(FiberStream::Flow.take_while { … })` and preserves the same predicate truthiness, early completion, and upstream close behavior.



163
164
165
# File 'lib/fiber_stream/source.rb', line 163

def take_while(&block)
  via(Flow.take_while(&block))
end

#to(sink) ⇒ Object

Returns a runnable pipeline from this source to ‘sink`.

Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.

Raises:

  • (TypeError)


206
207
208
209
210
# File 'lib/fiber_stream/source.rb', line 206

def to(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  Pipeline.__send__(:new, self, sink)
end

#via(flow) ⇒ Object

Returns a new source definition that passes this source through ‘flow`.

This method is lazy. It does not run the source, enumerate values, or call flow blocks.

Raises:

  • (TypeError)


59
60
61
62
63
# File 'lib/fiber_stream/source.rb', line 59

def via(flow)
  raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow)

  self.class.__send__(:new, @source_factory, @flows + [flow])
end

#zip(source) ⇒ Object

Returns a new source definition that emits pairs from this source and ‘source`.

Construction is lazy. The receiver side is materialized only when downstream demand reaches the zip stage; the other side is materialized only after the receiver produces an element for a pair. The zipped source completes when either input completes.

Raises:

  • (TypeError)


87
88
89
90
91
92
93
94
# File 'lib/fiber_stream/source.rb', line 87

def zip(source)
  raise TypeError, "expected FiberStream::Source" unless source.is_a?(Source)

  self.class.__send__(
    :new,
    -> { Pull.zip(materializer, source.__send__(:materializer)) }
  )
end