Class: Rubyists::Leopard::NatsJetstreamCallbacks

Inherits:
Object
  • Object
show all
Defined in:
lib/leopard/nats_jetstream_callbacks.rb

Overview

Maps Leopard handler outcomes to JetStream ack, nak, and term operations.

Instance Method Summary collapse

Constructor Details

#initialize(logger:) ⇒ void

Builds a callback set for JetStream message outcomes.

Parameters:

  • logger (#error)

    Logger used for failures and unhandled exceptions.



12
13
14
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 12

def initialize(logger:)
  @logger = logger
end

Instance Method Details

#ack_message(wrapper, _result) ⇒ void (private)

This method returns an undefined value.

Acknowledges a successfully processed JetStream message.

Parameters:

  • wrapper (MessageWrapper)

    Wrapped JetStream message.

  • _result (Dry::Monads::Success)

    Successful handler result.



37
38
39
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 37

def ack_message(wrapper, _result)
  wrapper.raw.ack
end

#callbacks_for(endpoint) ⇒ Hash{Symbol => #call}

Returns transport callbacks for a JetStream endpoint.

Parameters:

Returns:

  • (Hash{Symbol => #call})

    Outcome callbacks keyed by ‘:on_success`, `:on_failure`, and `:on_error`.



21
22
23
24
25
26
27
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 21

def callbacks_for(endpoint)
  {
    on_success: method(:ack_message),
    on_failure: ->(wrapper, result) { nak_message(wrapper, result, endpoint) },
    on_error: method(:term_message),
  }
end

#log_failure(failure) ⇒ void (private)

This method returns an undefined value.

Logs the failure payload returned by a handler.

Parameters:

  • failure (Object)

    The failure payload from the handler.



71
72
73
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 71

def log_failure(failure)
  @logger.error 'Error processing message: ', failure
end

#nak_message(wrapper, result, endpoint) ⇒ void (private)

This method returns an undefined value.

Negatively acknowledges a failed JetStream message, optionally delaying redelivery.

Parameters:

  • wrapper (MessageWrapper)

    Wrapped JetStream message.

  • result (Dry::Monads::Failure)

    Failed handler result.

  • endpoint (NatsJetstreamEndpoint)

    Endpoint configuration for the message.



48
49
50
51
52
53
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 48

def nak_message(wrapper, result, endpoint)
  log_failure(result.failure)
  return wrapper.raw.nak unless endpoint.nak_delay

  wrapper.raw.nak(delay: endpoint.nak_delay)
end

#term_message(wrapper, error) ⇒ void (private)

This method returns an undefined value.

Terminates a JetStream message after an unhandled exception.

Parameters:

  • wrapper (MessageWrapper)

    Wrapped JetStream message.

  • error (StandardError)

    The unhandled exception.



61
62
63
64
# File 'lib/leopard/nats_jetstream_callbacks.rb', line 61

def term_message(wrapper, error)
  @logger.error 'Unhandled JetStream error: ', error
  wrapper.raw.term
end