Class: Rubyists::Leopard::NatsJetstreamCallbacks
- Inherits:
-
Object
- Object
- Rubyists::Leopard::NatsJetstreamCallbacks
- Defined in:
- lib/leopard/nats_jetstream_callbacks.rb
Overview
Maps Leopard handler outcomes to JetStream ack, nak, and term operations.
Instance Method Summary collapse
-
#ack_message(wrapper, _result) ⇒ void
private
Acknowledges a successfully processed JetStream message.
-
#callbacks_for(endpoint) ⇒ Hash{Symbol => #call}
Returns transport callbacks for a JetStream endpoint.
-
#initialize(logger:) ⇒ void
constructor
Builds a callback set for JetStream message outcomes.
-
#log_failure(failure) ⇒ void
private
Logs the failure payload returned by a handler.
-
#nak_message(wrapper, result, endpoint) ⇒ void
private
Negatively acknowledges a failed JetStream message, optionally delaying redelivery.
-
#term_message(wrapper, error) ⇒ void
private
Terminates a JetStream message after an unhandled exception.
Constructor Details
#initialize(logger:) ⇒ void
Builds a callback set for JetStream message outcomes.
12 13 14 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 12 def initialize(logger:) @logger = logger end |
Instance Method Details
#ack_message(wrapper, _result) ⇒ void (private)
This method returns an undefined value.
Acknowledges a successfully processed JetStream message.
37 38 39 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 37 def (wrapper, _result) wrapper.raw.ack end |
#callbacks_for(endpoint) ⇒ Hash{Symbol => #call}
Returns transport callbacks for a JetStream endpoint.
21 22 23 24 25 26 27 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 21 def callbacks_for(endpoint) { on_success: method(:ack_message), on_failure: ->(wrapper, result) { (wrapper, result, endpoint) }, on_error: method(:term_message), } end |
#log_failure(failure) ⇒ void (private)
This method returns an undefined value.
Logs the failure payload returned by a handler.
71 72 73 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 71 def log_failure(failure) @logger.error 'Error processing message: ', failure end |
#nak_message(wrapper, result, endpoint) ⇒ void (private)
This method returns an undefined value.
Negatively acknowledges a failed JetStream message, optionally delaying redelivery.
48 49 50 51 52 53 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 48 def (wrapper, result, endpoint) log_failure(result.failure) return wrapper.raw.nak unless endpoint.nak_delay wrapper.raw.nak(delay: endpoint.nak_delay) end |
#term_message(wrapper, error) ⇒ void (private)
This method returns an undefined value.
Terminates a JetStream message after an unhandled exception.
61 62 63 64 |
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 61 def (wrapper, error) @logger.error 'Unhandled JetStream error: ', error wrapper.raw.term end |