Class: FiberStream::Source

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/source.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_factory, flows = []) ⇒ Source

Returns a new instance of Source.



50
51
52
53
# File 'lib/fiber_stream/source.rb', line 50

def initialize(source_factory, flows = [])
  @source_factory = source_factory
  @flows = flows
end

Class Method Details

.each(enumerable) ⇒ Object

Creates a source definition from an Enumerable.

The enumerable is not consumed until values are pulled by ‘run_with`. Each materialization creates an Enumerator with `enumerable.to_enum(:each)`; FiberStream does not snapshot values or guarantee replayability for one-shot enumerables.



11
12
13
# File 'lib/fiber_stream/source.rb', line 11

def self.each(enumerable)
  new(-> { Pull.each(enumerable) })
end

.io(io, chunk_size: 16 * 1024, close: false) ⇒ Object

Creates a source definition from an IO-like object.

The IO object is not read until values are pulled by ‘run_with`. Each materialization reads from the same IO object’s current position; this source does not snapshot, reopen, or guarantee replayability. The IO is closed only when ‘close: true` is passed.

Raises:

  • (TypeError)


21
22
23
24
25
26
27
28
29
# File 'lib/fiber_stream/source.rb', line 21

def self.io(io, chunk_size: 16 * 1024, close: false)
  raise TypeError, "io must respond to readpartial" unless io.respond_to?(:readpartial)
  raise TypeError, "chunk_size must be an Integer" unless chunk_size.is_a?(Integer)
  raise ArgumentError, "chunk_size must be positive" unless chunk_size.positive?
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)

  new(-> { Pull.io(io, chunk_size, close) })
end

.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true) ⇒ Object

Creates a backpressure-aware source definition from Ractor ports.

‘port` is the data/control port received by FiberStream. `ack_port` is a producer-owned port that receives `RactorPort::Ack` and `RactorPort::Cancel` control messages. The producer must wait for an ack before sending each `RactorPort::Element`, `RactorPort::Complete`, or `RactorPort::Failure` message.

Raises:

  • (TypeError)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fiber_stream/source.rb', line 38

def self.ractor_port(port, ack_port:, ack_transfer: :copy, cancel: true)
  raise TypeError, "port must respond to receive" unless port.respond_to?(:receive)
  unless ack_port.respond_to?(:send) && ack_port.method(:send).owner != Kernel
    raise TypeError, "ack_port must provide Ractor-style send"
  end

  Flow.__send__(:validate_ractor_transfer_policy!, :ack_transfer, ack_transfer)
  raise TypeError, "cancel must be true or false" unless [true, false].include?(cancel)

  new(-> { Pull.ractor_port(port, ack_port, ack_transfer, cancel) })
end

Instance Method Details

#asyncObject

Returns a new source definition with an asynchronous boundary.

This is a convenience wrapper around ‘via(FiberStream::Flow.async)` and preserves the same scheduler requirement and cancellation behavior.



122
123
124
# File 'lib/fiber_stream/source.rb', line 122

def async
  via(Flow.async)
end

#buffer(count) ⇒ Object

Returns a new source definition with a bounded asynchronous buffer.

This is a convenience wrapper around ‘via(FiberStream::Flow.buffer(count))` and preserves the same validation, scheduler requirement, and cancellation behavior.



131
132
133
# File 'lib/fiber_stream/source.rb', line 131

def buffer(count)
  via(Flow.buffer(count))
end

#lines(chomp: true, max_length: nil) ⇒ Object

Returns a new source definition that splits String chunks into lines.

This is a convenience wrapper around ‘via(FiberStream::Flow.lines(chomp:, max_length:))`.



139
140
141
# File 'lib/fiber_stream/source.rb', line 139

def lines(chomp: true, max_length: nil)
  via(Flow.lines(chomp: chomp, max_length: max_length))
end

#map(&block) ⇒ Object

Returns a new source definition that maps each element with ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.map { … })` and has the same lazy construction, error, and backpressure behavior as the underlying flow.



70
71
72
# File 'lib/fiber_stream/source.rb', line 70

def map(&block)
  via(Flow.map(&block))
end

#parallel_map(concurrency:, &block) ⇒ Object

Returns a new source definition that maps elements concurrently.

This is a convenience wrapper around ‘via(FiberStream::Flow.parallel_map(concurrency:) { … })` and preserves the same ordered delivery, scheduler requirement, validation, bounded upstream run-ahead, and cancellation behavior.



80
81
82
# File 'lib/fiber_stream/source.rb', line 80

def parallel_map(concurrency:, &block)
  via(Flow.parallel_map(concurrency: concurrency, &block))
end

#ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block) ⇒ Object

Returns a new source definition that maps elements in Ractor workers.

This is a convenience wrapper around ‘via(FiberStream::Flow.ractor_map(workers:) { … })` and preserves the same shareable mapper requirement, ordered delivery, transfer policy, bounded upstream run-ahead, and cooperative worker shutdown behavior.



90
91
92
93
94
95
96
97
98
99
# File 'lib/fiber_stream/source.rb', line 90

def ractor_map(workers:, input_transfer: :copy, output_transfer: :copy, &block)
  via(
    Flow.ractor_map(
      workers: workers,
      input_transfer: input_transfer,
      output_transfer: output_transfer,
      &block
    )
  )
end

#run_with(sink) ⇒ Object

Materializes and runs this source with ‘sink`.

The stream runs in the current fiber until completion or failure. The method returns the sink’s materialized value and closes the materialized pull chain on success, failure, or early sink completion.

Raises:

  • (TypeError)


158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/fiber_stream/source.rb', line 158

def run_with(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  primary_error = nil

  begin
    stream = @source_factory.call
    @flows.each do |flow|
      stream = flow.__send__(:attach, stream)
    end

    sink.__send__(:run, stream)
  rescue StandardError => error
    primary_error = error
    raise
  ensure
    begin
      stream&.close
    rescue StandardError => close_error
      raise close_error unless primary_error
    end
  end
end

#select(&block) ⇒ Object

Returns a new source definition that keeps elements matching ‘block`.

This is a convenience wrapper around ‘via(FiberStream::Flow.select { … })` and has the same truthiness and lazy construction behavior as the underlying flow.



106
107
108
# File 'lib/fiber_stream/source.rb', line 106

def select(&block)
  via(Flow.select(&block))
end

#take(count) ⇒ Object

Returns a new source definition that emits at most ‘count` elements.

This is a convenience wrapper around ‘via(FiberStream::Flow.take(count))` and preserves the same validation and upstream close behavior.



114
115
116
# File 'lib/fiber_stream/source.rb', line 114

def take(count)
  via(Flow.take(count))
end

#to(sink) ⇒ Object

Returns a runnable pipeline from this source to ‘sink`.

Construction is lazy. The source and sink are not materialized until ‘Pipeline#run` is called.

Raises:

  • (TypeError)


147
148
149
150
151
# File 'lib/fiber_stream/source.rb', line 147

def to(sink)
  raise TypeError, "expected FiberStream::Sink" unless sink.is_a?(Sink)

  Pipeline.__send__(:new, self, sink)
end

#via(flow) ⇒ Object

Returns a new source definition that passes this source through ‘flow`.

This method is lazy. It does not run the source, enumerate values, or call flow blocks.

Raises:

  • (TypeError)


59
60
61
62
63
# File 'lib/fiber_stream/source.rb', line 59

def via(flow)
  raise TypeError, "expected FiberStream::Flow" unless flow.is_a?(Flow)

  self.class.__send__(:new, @source_factory, @flows + [flow])
end