Class: FiberStream::RactorProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_stream/ractor_producer.rb

Overview

Producer-side context for ‘Source.ractor_producer`.

Producer blocks call ‘emit`, `complete`, or `fail` to send one protocol message after receiving one downstream acknowledgment. A `false` return means cooperative cancellation was observed before the requested message could be sent.

Instance Method Summary collapse

Constructor Details

#initialize(data_port, ack_port, transfer) ⇒ RactorProducer

Returns a new instance of RactorProducer.



11
12
13
14
15
16
17
18
# File 'lib/fiber_stream/ractor_producer.rb', line 11

def initialize(data_port, ack_port, transfer)
  @data_port = data_port
  @ack_port = ack_port
  @transfer = transfer
  @terminal = false
  @cancelled = false
  @send_failed = false
end

Instance Method Details

#cancelled?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/fiber_stream/ractor_producer.rb', line 45

def cancelled?
  @cancelled
end

#completeObject



29
30
31
32
33
34
# File 'lib/fiber_stream/ractor_producer.rb', line 29

def complete
  return false if terminal? || cancelled?
  return false unless wait_for_ack

  send_terminal_message(RactorPort::Complete.new)
end

#emit(value, transfer: nil) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/fiber_stream/ractor_producer.rb', line 20

def emit(value, transfer: nil)
  return false if terminal? || cancelled?

  message_transfer = validate_transfer_override!(transfer)
  return false unless wait_for_ack

  send_emitted_message(RactorPort::Element.new(value), message_transfer)
end

#fail(error = nil, cause_class_name: nil, cause_message: nil) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/fiber_stream/ractor_producer.rb', line 36

def fail(error = nil, cause_class_name: nil, cause_message: nil)
  return false if terminal? || cancelled?

  failure = failure_message(error, cause_class_name, cause_message)
  return false unless wait_for_ack

  send_terminal_message(failure)
end

#send_failed?Boolean

:nodoc:

Returns:

  • (Boolean)


53
54
55
# File 'lib/fiber_stream/ractor_producer.rb', line 53

def send_failed? # :nodoc:
  @send_failed
end

#terminal?Boolean

:nodoc:

Returns:

  • (Boolean)


49
50
51
# File 'lib/fiber_stream/ractor_producer.rb', line 49

def terminal? # :nodoc:
  @terminal
end