Class: FiberStream::Sink

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/sink.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&run) ⇒ Sink

Returns a new instance of Sink.



97
98
99
# File 'lib/fiber_stream/sink.rb', line 97

def initialize(&run)
  @run = run
end

Class Method Details

.build(&run) ⇒ Object

:nodoc:



93
94
95
# File 'lib/fiber_stream/sink.rb', line 93

def self.build(&run) # :nodoc:
  new(&run)
end

.firstObject

Creates a sink that returns the first stream element.

The sink pulls at most one element. It returns ‘nil` when upstream completes before producing a value.



25
26
27
28
29
30
# File 'lib/fiber_stream/sink.rb', line 25

def self.first
  new do |stream|
    value = stream.next
    Pull.done?(value) ? nil : value
  end
end

.fold(initial, &block) ⇒ Object

Creates a sink that folds all stream elements into an accumulator.

The sink consumes upstream until normal completion. It returns the final accumulator, or the initial accumulator when upstream is empty. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`. FiberStream assigns the initial accumulator directly; it does not duplicate or freeze that object.

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fiber_stream/sink.rb', line 39

def self.fold(initial, &block)
  raise ArgumentError, "missing block" unless block

  new do |stream|
    accumulator = initial

    Pull.each_value(stream) do |value|
      accumulator = block.call(accumulator, value)
    end

    accumulator
  end
end

.foreach(&block) ⇒ Object

Creates a sink that runs a block for each stream element.

The sink consumes upstream until normal completion, calls the block once per element in input order, and returns the number of elements whose block completed successfully. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`.

Raises:

  • (ArgumentError)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fiber_stream/sink.rb', line 59

def self.foreach(&block)
  raise ArgumentError, "missing block" unless block

  new do |stream|
    count = 0

    Pull.each_value(stream) do |value|
      block.call(value)
      count += 1
    end

    count
  end
end

.io(io, close: false, flush: false) ⇒ Object

Creates a sink that writes String chunks to an IO-like object.

The sink consumes upstream until normal completion and returns the number of chunks successfully written. It requires a scheduler-backed non-blocking fiber before write, flush, or normal close operations. The IO object is closed only when ‘close: true` is passed, and flushed on normal completion only when `flush: true` is passed.

Raises:

  • (TypeError)


81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fiber_stream/sink.rb', line 81

def self.io(io, close: false, flush: false)
  raise TypeError, "io must respond to write" unless io.respond_to?(:write)
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "flush must be true or false" unless [true, false].include?(flush)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)
  raise TypeError, "io must respond to flush" if flush && !io.respond_to?(:flush)

  new do |stream|
    IOSink.new(io, close, flush).run(stream)
  end
end

.to_aObject

Creates a sink that collects all stream elements into an Array.

The sink consumes upstream until normal completion and returns the collected array as the stream materialized value.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/fiber_stream/sink.rb', line 9

def self.to_a
  new do |stream|
    values = []

    Pull.each_value(stream) do |value|
      values << value
    end

    values
  end
end

Instance Method Details

#run_stream(stream) ⇒ Object

:nodoc:



103
104
105
# File 'lib/fiber_stream/sink.rb', line 103

def run_stream(stream) # :nodoc:
  @run.call(stream)
end