Class: FiberStream::Sink
- Inherits:
-
Object
- Object
- FiberStream::Sink
- Defined in:
- lib/fiber_stream/sink.rb
Class Method Summary collapse
-
.build(&run) ⇒ Object
:nodoc:.
-
.first ⇒ Object
Creates a sink that returns the first stream element.
-
.fold(initial, &block) ⇒ Object
Creates a sink that folds all stream elements into an accumulator.
-
.foreach(&block) ⇒ Object
Creates a sink that runs a block for each stream element.
-
.io(io, close: false, flush: false) ⇒ Object
Creates a sink that writes String chunks to an IO-like object.
-
.to_a ⇒ Object
Creates a sink that collects all stream elements into an Array.
Instance Method Summary collapse
-
#initialize(&run) ⇒ Sink
constructor
A new instance of Sink.
-
#run_stream(stream) ⇒ Object
:nodoc:.
Constructor Details
#initialize(&run) ⇒ Sink
Returns a new instance of Sink.
97 98 99 |
# File 'lib/fiber_stream/sink.rb', line 97 def initialize(&run) @run = run end |
Class Method Details
.build(&run) ⇒ Object
:nodoc:
93 94 95 |
# File 'lib/fiber_stream/sink.rb', line 93 def self.build(&run) # :nodoc: new(&run) end |
.first ⇒ Object
Creates a sink that returns the first stream element.
The sink pulls at most one element. It returns ‘nil` when upstream completes before producing a value.
25 26 27 28 29 30 |
# File 'lib/fiber_stream/sink.rb', line 25 def self.first new do |stream| value = stream.next Pull.done?(value) ? nil : value end end |
.fold(initial, &block) ⇒ Object
Creates a sink that folds all stream elements into an accumulator.
The sink consumes upstream until normal completion. It returns the final accumulator, or the initial accumulator when upstream is empty. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`. FiberStream assigns the initial accumulator directly; it does not duplicate or freeze that object.
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fiber_stream/sink.rb', line 39 def self.fold(initial, &block) raise ArgumentError, "missing block" unless block new do |stream| accumulator = initial Pull.each_value(stream) do |value| accumulator = block.call(accumulator, value) end accumulator end end |
.foreach(&block) ⇒ Object
Creates a sink that runs a block for each stream element.
The sink consumes upstream until normal completion, calls the block once per element in input order, and returns the number of elements whose block completed successfully. Exceptions raised by the block fail the stream and are re-raised from ‘Source#run_with`.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fiber_stream/sink.rb', line 59 def self.foreach(&block) raise ArgumentError, "missing block" unless block new do |stream| count = 0 Pull.each_value(stream) do |value| block.call(value) count += 1 end count end end |
.io(io, close: false, flush: false) ⇒ Object
Creates a sink that writes String chunks to an IO-like object.
The sink consumes upstream until normal completion and returns the number of chunks successfully written. It requires a scheduler-backed non-blocking fiber before write, flush, or normal close operations. The IO object is closed only when ‘close: true` is passed, and flushed on normal completion only when `flush: true` is passed.
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fiber_stream/sink.rb', line 81 def self.io(io, close: false, flush: false) raise TypeError, "io must respond to write" unless io.respond_to?(:write) raise TypeError, "close must be true or false" unless [true, false].include?(close) raise TypeError, "flush must be true or false" unless [true, false].include?(flush) raise TypeError, "io must respond to close" if close && !io.respond_to?(:close) raise TypeError, "io must respond to flush" if flush && !io.respond_to?(:flush) new do |stream| IOSink.new(io, close, flush).run(stream) end end |
.to_a ⇒ Object
Creates a sink that collects all stream elements into an Array.
The sink consumes upstream until normal completion and returns the collected array as the stream materialized value.
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/fiber_stream/sink.rb', line 9 def self.to_a new do |stream| values = [] Pull.each_value(stream) do |value| values << value end values end end |
Instance Method Details
#run_stream(stream) ⇒ Object
:nodoc:
103 104 105 |
# File 'lib/fiber_stream/sink.rb', line 103 def run_stream(stream) # :nodoc: @run.call(stream) end |