Class: FiberStream::Sink

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/sink.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&run) ⇒ Sink

Returns a new instance of Sink.



113
114
115
# File 'lib/fiber_stream/sink.rb', line 113

def initialize(&run)
  @run = run
end

Class Method Details

.build(&run) ⇒ Object

:nodoc:



109
110
111
# File 'lib/fiber_stream/sink.rb', line 109

def self.build(&run) # :nodoc:
  new(&run)
end

.countObject

Creates a sink that counts all stream elements.

The sink consumes upstream until normal completion and returns the number of elements observed. It does not store consumed elements.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fiber_stream/sink.rb', line 36

def self.count
  new do |stream|
    count = 0

    Pull.each_value(stream) do
      count += 1
    end

    count
  end
end

.firstObject

Creates a sink that returns the first stream element.

The sink pulls at most one element. It returns ‘nil` when upstream completes before producing a value.



25
26
27
28
29
30
# File 'lib/fiber_stream/sink.rb', line 25

def self.first
  new do |stream|
    value = stream.next
    Pull.done?(value) ? nil : value
  end
end

.fold(initial, &block) ⇒ Object

Creates a sink that folds all stream elements into an accumulator.

The sink consumes upstream until normal completion. It returns the final accumulator, or the initial accumulator when upstream is empty. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`. FiberStream assigns the initial accumulator directly; it does not duplicate or freeze that object.

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fiber_stream/sink.rb', line 55

def self.fold(initial, &block)
  raise ArgumentError, "missing block" unless block

  new do |stream|
    accumulator = initial

    Pull.each_value(stream) do |value|
      accumulator = block.call(accumulator, value)
    end

    accumulator
  end
end

.foreach(&block) ⇒ Object

Creates a sink that runs a block for each stream element.

The sink consumes upstream until normal completion, calls the block once per element in input order, and returns the number of elements whose block completed successfully. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`.

Raises:

  • (ArgumentError)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fiber_stream/sink.rb', line 75

def self.foreach(&block)
  raise ArgumentError, "missing block" unless block

  new do |stream|
    count = 0

    Pull.each_value(stream) do |value|
      block.call(value)
      count += 1
    end

    count
  end
end

.io(io, close: false, flush: false) ⇒ Object

Creates a sink that writes String chunks to an IO-like object.

The sink consumes upstream until normal completion and returns the number of chunks successfully written. It requires a scheduler-backed non-blocking fiber before write, flush, or normal close operations. The IO object is closed only when ‘close: true` is passed, and flushed on normal completion only when `flush: true` is passed.

Raises:

  • (TypeError)


97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fiber_stream/sink.rb', line 97

def self.io(io, close: false, flush: false)
  raise TypeError, "io must respond to write" unless io.respond_to?(:write)
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "flush must be true or false" unless [true, false].include?(flush)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)
  raise TypeError, "io must respond to flush" if flush && !io.respond_to?(:flush)

  new do |stream|
    IOSink.new(io, close, flush).run(stream)
  end
end

.to_aObject

Creates a sink that collects all stream elements into an Array.

The sink consumes upstream until normal completion and returns the collected array as the stream materialized value.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/fiber_stream/sink.rb', line 9

def self.to_a
  new do |stream|
    values = []

    Pull.each_value(stream) do |value|
      values << value
    end

    values
  end
end

Instance Method Details

#run_stream(stream) ⇒ Object

:nodoc:



119
120
121
# File 'lib/fiber_stream/sink.rb', line 119

def run_stream(stream) # :nodoc:
  @run.call(stream)
end