Class: FiberStream::RactorProducer
- Inherits:
-
Object
- Object
- FiberStream::RactorProducer
- Defined in:
- lib/fiber_stream/ractor_producer.rb
Overview
Producer-side context for ‘Source.ractor_producer`.
Producer blocks call ‘emit`, `complete`, or `fail` to send one protocol message after receiving one downstream acknowledgment. A `false` return means cooperative cancellation was observed before the requested message could be sent.
Instance Method Summary collapse
- #cancelled? ⇒ Boolean
- #complete ⇒ Object
- #emit(value, transfer: nil) ⇒ Object
- #fail(error = nil, cause_class_name: nil, cause_message: nil) ⇒ Object
-
#initialize(data_port, ack_port, transfer) ⇒ RactorProducer
constructor
A new instance of RactorProducer.
-
#send_failed? ⇒ Boolean
:nodoc:.
-
#terminal? ⇒ Boolean
:nodoc:.
Constructor Details
#initialize(data_port, ack_port, transfer) ⇒ RactorProducer
Returns a new instance of RactorProducer.
11 12 13 14 15 16 17 18 |
# File 'lib/fiber_stream/ractor_producer.rb', line 11 def initialize(data_port, ack_port, transfer) @data_port = data_port @ack_port = ack_port @transfer = transfer @terminal = false @cancelled = false @send_failed = false end |
Instance Method Details
#cancelled? ⇒ Boolean
45 46 47 |
# File 'lib/fiber_stream/ractor_producer.rb', line 45 def cancelled? @cancelled end |
#complete ⇒ Object
29 30 31 32 33 34 |
# File 'lib/fiber_stream/ractor_producer.rb', line 29 def complete return false if terminal? || cancelled? return false unless wait_for_ack (RactorPort::Complete.new) end |
#emit(value, transfer: nil) ⇒ Object
20 21 22 23 24 25 26 27 |
# File 'lib/fiber_stream/ractor_producer.rb', line 20 def emit(value, transfer: nil) return false if terminal? || cancelled? = validate_transfer_override!(transfer) return false unless wait_for_ack (RactorPort::Element.new(value), ) end |
#fail(error = nil, cause_class_name: nil, cause_message: nil) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/fiber_stream/ractor_producer.rb', line 36 def fail(error = nil, cause_class_name: nil, cause_message: nil) return false if terminal? || cancelled? failure = (error, cause_class_name, ) return false unless wait_for_ack (failure) end |
#send_failed? ⇒ Boolean
:nodoc:
53 54 55 |
# File 'lib/fiber_stream/ractor_producer.rb', line 53 def send_failed? # :nodoc: @send_failed end |
#terminal? ⇒ Boolean
:nodoc:
49 50 51 |
# File 'lib/fiber_stream/ractor_producer.rb', line 49 def terminal? # :nodoc: @terminal end |