Class: FiberStream::Sink

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/sink.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&run) ⇒ Sink

Returns a new instance of Sink.



78
79
80
# File 'lib/fiber_stream/sink.rb', line 78

def initialize(&run)
  @run = run
end

Class Method Details

.firstObject

Creates a sink that returns the first stream element.

The sink pulls at most one element. It returns ‘nil` when upstream completes before producing a value.



28
29
30
31
32
33
# File 'lib/fiber_stream/sink.rb', line 28

def self.first
  new do |stream|
    value = stream.next
    Pull.done?(value) ? nil : value
  end
end

.fold(initial, &block) ⇒ Object

Creates a sink that folds all stream elements into an accumulator.

The sink consumes upstream until normal completion. It returns the final accumulator, or the initial accumulator when upstream is empty. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`. FiberStream assigns the initial accumulator directly; it does not duplicate or freeze that object.

Raises:

  • (ArgumentError)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fiber_stream/sink.rb', line 42

def self.fold(initial, &block)
  raise ArgumentError, "missing block" unless block

  new do |stream|
    accumulator = initial

    loop do
      value = stream.next
      break if Pull.done?(value)

      accumulator = block.call(accumulator, value)
    end

    accumulator
  end
end

.io(io, close: false, flush: false) ⇒ Object

Creates a sink that writes String chunks to an IO-like object.

The sink consumes upstream until normal completion and returns the number of chunks successfully written. It requires a scheduler-backed non-blocking fiber before write, flush, or normal close operations. The IO object is closed only when ‘close: true` is passed, and flushed on normal completion only when `flush: true` is passed.

Raises:

  • (TypeError)


66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fiber_stream/sink.rb', line 66

def self.io(io, close: false, flush: false)
  raise TypeError, "io must respond to write" unless io.respond_to?(:write)
  raise TypeError, "close must be true or false" unless [true, false].include?(close)
  raise TypeError, "flush must be true or false" unless [true, false].include?(flush)
  raise TypeError, "io must respond to close" if close && !io.respond_to?(:close)
  raise TypeError, "io must respond to flush" if flush && !io.respond_to?(:flush)

  new do |stream|
    IOSink.new(io, close, flush).run(stream)
  end
end

.to_aObject

Creates a sink that collects all stream elements into an Array.

The sink consumes upstream until normal completion and returns the collected array as the stream materialized value.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/fiber_stream/sink.rb', line 9

def self.to_a
  new do |stream|
    values = []

    loop do
      value = stream.next
      break if Pull.done?(value)

      values << value
    end

    values
  end
end