Class: FiberStream::Sink
- Inherits:
-
Object
- Object
- FiberStream::Sink
- Defined in:
- lib/fiber_stream/sink.rb
Class Method Summary collapse
-
.first ⇒ Object
Creates a sink that returns the first stream element.
-
.fold(initial, &block) ⇒ Object
Creates a sink that folds all stream elements into an accumulator.
-
.io(io, close: false, flush: false) ⇒ Object
Creates a sink that writes String chunks to an IO-like object.
-
.to_a ⇒ Object
Creates a sink that collects all stream elements into an Array.
Instance Method Summary collapse
-
#initialize(&run) ⇒ Sink
constructor
A new instance of Sink.
Constructor Details
#initialize(&run) ⇒ Sink
Returns a new instance of Sink.
78 79 80 |
# File 'lib/fiber_stream/sink.rb', line 78 def initialize(&run) @run = run end |
Class Method Details
.first ⇒ Object
Creates a sink that returns the first stream element.
The sink pulls at most one element. It returns ‘nil` when upstream completes before producing a value.
28 29 30 31 32 33 |
# File 'lib/fiber_stream/sink.rb', line 28 def self.first new do |stream| value = stream.next Pull.done?(value) ? nil : value end end |
.fold(initial, &block) ⇒ Object
Creates a sink that folds all stream elements into an accumulator.
The sink consumes upstream until normal completion. It returns the final accumulator, or the initial accumulator when upstream is empty. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`. FiberStream assigns the initial accumulator directly; it does not duplicate or freeze that object.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fiber_stream/sink.rb', line 42 def self.fold(initial, &block) raise ArgumentError, "missing block" unless block new do |stream| accumulator = initial loop do value = stream.next break if Pull.done?(value) accumulator = block.call(accumulator, value) end accumulator end end |
.io(io, close: false, flush: false) ⇒ Object
Creates a sink that writes String chunks to an IO-like object.
The sink consumes upstream until normal completion and returns the number of chunks successfully written. It requires a scheduler-backed non-blocking fiber before write, flush, or normal close operations. The IO object is closed only when ‘close: true` is passed, and flushed on normal completion only when `flush: true` is passed.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fiber_stream/sink.rb', line 66 def self.io(io, close: false, flush: false) raise TypeError, "io must respond to write" unless io.respond_to?(:write) raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "flush must be true or false" unless [true, false].include?(flush) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) raise TypeError, "io must respond to flush" if flush && !io.respond_to?(:flush) new do |stream| IOSink.new(io, close, flush).run(stream) end end |
.to_a ⇒ Object
Creates a sink that collects all stream elements into an Array.
The sink consumes upstream until normal completion and returns the collected array as the stream materialized value.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/fiber_stream/sink.rb', line 9 def self.to_a new do |stream| values = [] loop do value = stream.next break if Pull.done?(value) values << value end values end end |