Module: Protobuf::Nats

Defined in:
lib/protobuf/nats.rb,
lib/protobuf/nats/client.rb,
lib/protobuf/nats/config.rb,
lib/protobuf/nats/errors.rb,
lib/protobuf/nats/runner.rb,
lib/protobuf/nats/server.rb,
lib/protobuf/nats/version.rb,
lib/protobuf/nats/thread_pool.rb,
lib/protobuf/nats/uuidv7_helper.rb,
lib/protobuf/nats/response_muxer.rb,
lib/protobuf/nats/response_muxer_request.rb,
lib/protobuf/nats/super_subscription_manager.rb

Defined Under Namespace

Modules: Errors, Messages Classes: Client, Config, ResponseMuxer, ResponseMuxerRequest, Runner, Server, SuperSubscriptionManager, ThreadPool, UUIDv7Helper

Constant Summary collapse

NatsClient =
::NATS::IO::Client
GET_CONNECTED_MUTEX =
::Mutex.new
ERROR_CALLBACK_EXECUTOR =

Bounded, single-thread executor for running error callbacks OFF hot/shared threads (notably nats-pure’s read/flush thread via on_error). A slow user callback must not stall message processing for every subject. The queue is bounded and over-capacity notifications are discarded (they’re advisory).

::Concurrent::ThreadPoolExecutor.new(
  :min_threads => 0,
  :max_threads => 1,
  :max_queue => 1024,
  :fallback_policy => :discard
)
VERSION =
"0.13.1.pre1"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.client_nats_connectionObject

Returns the value of attribute client_nats_connection.



25
26
27
# File 'lib/protobuf/nats.rb', line 25

def client_nats_connection
  @client_nats_connection
end

Class Method Details

.configObject



37
38
39
40
41
42
43
# File 'lib/protobuf/nats.rb', line 37

def self.config
  @config ||= begin
    config = ::Protobuf::Nats::Config.new
    config.load_from_yml
    config
  end
end

.crash_backoff_seconds(crash_count, cap = 60) ⇒ Object

Exponential backoff (seconds) for self-healing worker threads after a fatal crash, capped. Shared by the ResponseMuxer dispatcher pool and the server SuperSubscriptionManager handler pool so the formula can’t drift between them.



169
170
171
# File 'lib/protobuf/nats.rb', line 169

def self.crash_backoff_seconds(crash_count, cap = 60)
  [(crash_count**2), cap].min
end

.error_callbacksObject

We will always log an error.



49
50
51
# File 'lib/protobuf/nats.rb', line 49

def self.error_callbacks
  @error_callbacks ||= [lambda { |error| log_error(error) }]
end

.instrument(event, payload = {}, &block) ⇒ Object

Single instrumentation entry point. Appends the gem’s ‘.protobuf-nats` suffix so callers don’t repeat it (and can’t typo it). Supports both the value form ‘instrument(“server.x”, 5)` and the block form `instrument(“client.request_duration”) { … }`.



66
67
68
# File 'lib/protobuf/nats.rb', line 66

def self.instrument(event, payload = {}, &block)
  ::ActiveSupport::Notifications.instrument("#{event}.protobuf-nats", payload, &block)
end

.log_error(error) ⇒ Object



173
174
175
176
177
178
179
# File 'lib/protobuf/nats.rb', line 173

def self.log_error(error)
  logger.error error.to_s
  logger.error error.class.to_s
  if error.respond_to?(:backtrace) && error.backtrace.is_a?(::Array)
    logger.error error.backtrace.join("\n")
  end
end

.loggerObject



181
182
183
# File 'lib/protobuf/nats.rb', line 181

def self.logger
  ::Protobuf::Logging.logger
end

.monotonic_timeObject

Monotonic clock for durations/ages; immune to wall-clock (NTP) jumps. Single source of truth shared by the client muxer and server pools.



162
163
164
# File 'lib/protobuf/nats.rb', line 162

def self.monotonic_time
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end

.notify_error_callbacks(error) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/protobuf/nats.rb', line 70

def self.notify_error_callbacks(error)
  error_callbacks.each do |callback|
    begin
      callback.call(error)
    rescue => callback_error
      log_error(callback_error)
    end
  end

  nil
end

.notify_error_callbacks_async(error) ⇒ Object



93
94
95
96
97
98
# File 'lib/protobuf/nats.rb', line 93

def self.notify_error_callbacks_async(error)
  ERROR_CALLBACK_EXECUTOR.post { notify_error_callbacks(error) }
  nil
rescue ::Concurrent::RejectedExecutionError
  nil
end

.on_error(&block) ⇒ Object



56
57
58
59
60
# File 'lib/protobuf/nats.rb', line 56

def self.on_error(&block)
  fail ::ArgumentError unless block.arity == 1
  error_callbacks << block
  nil
end

.start_client_nats_connectionObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/protobuf/nats.rb', line 108

def self.start_client_nats_connection
  return true if @client_nats_connection

  GET_CONNECTED_MUTEX.synchronize do
    break true if @client_nats_connection

    # NOTE: nats-pure has no :disable_reconnect_buffer option (it was a
    # jnats concept). During a reconnect nats-pure buffers publishes and,
    # if the connection is fully closed, raises ConnectionClosedError --
    # both of which the client's transient-error retry path now handles.
    options = config.connection_options

    client = NatsClient.new

    # Register lifecycle callbacks BEFORE connecting so a disconnect or
    # error during the initial handshake is still observed.
    client.on_disconnect do
      logger.warn("Client NATS connection was disconnected")
    end

    client.on_reconnect do
      logger.warn("Client NATS connection was reconnected")
    end

    client.on_close do
      logger.warn("Client NATS connection was closed")
    end

    client.on_error do |error|
      # Runs on nats-pure's read/flush thread -- offload so a slow callback
      # can't stall message processing.
      notify_error_callbacks_async(error)
    end

    begin
      client.connect(options)
      # Ensure we have a valid connection to the NATS server.
      client.flush(5)
    rescue => e
      # A failed handshake can leave nats-pure's reader/flusher threads
      # running on a half-open client; close it so we don't leak them, then
      # surface the failure (the next call will retry with a fresh client).
      client.close rescue nil
      raise e
    end

    @client_nats_connection = client

    true
  end
end

.subscription_key(service_klass, service_method) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/protobuf/nats.rb', line 100

def self.subscription_key(service_klass, service_method)
  service_class_name = service_klass.name.underscore.gsub("/", ".")
  service_method_name = service_method.to_s.underscore

  subscription_key = "rpc.#{service_class_name}.#{service_method_name}"
  subscription_key = config.make_subscription_key_replacements(subscription_key)
end