Module: Protobuf::Nats
- Defined in:
- lib/protobuf/nats.rb,
lib/protobuf/nats/client.rb,
lib/protobuf/nats/config.rb,
lib/protobuf/nats/errors.rb,
lib/protobuf/nats/runner.rb,
lib/protobuf/nats/server.rb,
lib/protobuf/nats/version.rb,
lib/protobuf/nats/thread_pool.rb,
lib/protobuf/nats/uuidv7_helper.rb,
lib/protobuf/nats/response_muxer.rb,
lib/protobuf/nats/response_muxer_request.rb,
lib/protobuf/nats/super_subscription_manager.rb
Defined Under Namespace
Modules: Errors, Messages Classes: Client, Config, ResponseMuxer, ResponseMuxerRequest, Runner, Server, SuperSubscriptionManager, ThreadPool, UUIDv7Helper
Constant Summary collapse
- NatsClient =
::NATS::IO::Client
- GET_CONNECTED_MUTEX =
::Mutex.new
- ERROR_CALLBACK_EXECUTOR =
Bounded, single-thread executor for running error callbacks OFF hot/shared threads (notably nats-pure’s read/flush thread via on_error). A slow user callback must not stall message processing for every subject. The queue is bounded and over-capacity notifications are discarded (they’re advisory).
::Concurrent::ThreadPoolExecutor.new( :min_threads => 0, :max_threads => 1, :max_queue => 1024, :fallback_policy => :discard )
- ERROR_CALLBACK_DROP_COUNT =
Count of error callbacks discarded because the bounded executor was saturated. Lets a flood of dropped callbacks during an incident be observed instead of vanishing silently.
::Concurrent::AtomicFixnum.new(0)
- VERSION =
"0.13.1.pre2"
Class Attribute Summary collapse
-
.client_nats_connection ⇒ Object
Returns the value of attribute client_nats_connection.
Class Method Summary collapse
- .config ⇒ Object
-
.crash_backoff_seconds(crash_count, cap = 60) ⇒ Object
Exponential backoff (seconds) for self-healing worker threads after a fatal crash, capped.
- .error_callback_drop_count ⇒ Object
-
.error_callbacks ⇒ Object
We will always log an error.
-
.instrument(event, payload = {}, &block) ⇒ Object
Single instrumentation entry point.
- .log_error(error) ⇒ Object
- .logger ⇒ Object
-
.monotonic_time ⇒ Object
Monotonic clock for durations/ages; immune to wall-clock (NTP) jumps.
- .notify_error_callbacks(error) ⇒ Object
- .notify_error_callbacks_async(error) ⇒ Object
- .on_error(&block) ⇒ Object
-
.record_dropped_error_callback ⇒ Object
Record a discarded error callback.
- .start_client_nats_connection ⇒ Object
- .subscription_key(service_klass, service_method) ⇒ Object
Class Attribute Details
.client_nats_connection ⇒ Object
Returns the value of attribute client_nats_connection.
25 26 27 |
# File 'lib/protobuf/nats.rb', line 25 def client_nats_connection @client_nats_connection end |
Class Method Details
.config ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/protobuf/nats.rb', line 37 def self.config @config ||= begin config = ::Protobuf::Nats::Config.new config.load_from_yml config end end |
.crash_backoff_seconds(crash_count, cap = 60) ⇒ Object
Exponential backoff (seconds) for self-healing worker threads after a fatal crash, capped. Shared by the ResponseMuxer dispatcher pool and the server SuperSubscriptionManager handler pool so the formula can’t drift between them.
197 198 199 |
# File 'lib/protobuf/nats.rb', line 197 def self.crash_backoff_seconds(crash_count, cap = 60) [(crash_count**2), cap].min end |
.error_callback_drop_count ⇒ Object
98 99 100 |
# File 'lib/protobuf/nats.rb', line 98 def self.error_callback_drop_count ERROR_CALLBACK_DROP_COUNT.value end |
.error_callbacks ⇒ Object
We will always log an error.
49 50 51 |
# File 'lib/protobuf/nats.rb', line 49 def self.error_callbacks @error_callbacks ||= [lambda { |error| log_error(error) }] end |
.instrument(event, payload = {}, &block) ⇒ Object
Single instrumentation entry point. Appends the gem’s ‘.protobuf-nats` suffix so callers don’t repeat it (and can’t typo it). Supports both the value form ‘instrument(“server.x”, 5)` and the block form `instrument(“client.request_duration”) { … }`.
66 67 68 |
# File 'lib/protobuf/nats.rb', line 66 def self.instrument(event, payload = {}, &block) ::ActiveSupport::Notifications.instrument("#{event}.protobuf-nats", payload, &block) end |
.log_error(error) ⇒ Object
201 202 203 204 205 206 207 |
# File 'lib/protobuf/nats.rb', line 201 def self.log_error(error) logger.error error.to_s logger.error error.class.to_s if error.respond_to?(:backtrace) && error.backtrace.is_a?(::Array) logger.error error.backtrace.join("\n") end end |
.logger ⇒ Object
209 210 211 |
# File 'lib/protobuf/nats.rb', line 209 def self.logger ::Protobuf::Logging.logger end |
.monotonic_time ⇒ Object
Monotonic clock for durations/ages; immune to wall-clock (NTP) jumps. Single source of truth shared by the client muxer and server pools.
190 191 192 |
# File 'lib/protobuf/nats.rb', line 190 def self.monotonic_time ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) end |
.notify_error_callbacks(error) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/protobuf/nats.rb', line 70 def self.notify_error_callbacks(error) error_callbacks.each do |callback| begin callback.call(error) rescue => callback_error log_error(callback_error) end end nil end |
.notify_error_callbacks_async(error) ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/protobuf/nats.rb', line 102 def self.notify_error_callbacks_async(error) # #post returns false when the job is rejected. With the :discard fallback # policy the job is silently dropped (returning false) rather than raising, # so the false return is the only drop signal to handle. accepted = ERROR_CALLBACK_EXECUTOR.post { notify_error_callbacks(error) } record_dropped_error_callback unless accepted nil end |
.on_error(&block) ⇒ Object
56 57 58 59 60 |
# File 'lib/protobuf/nats.rb', line 56 def self.on_error(&block) fail ::ArgumentError unless block.arity == 1 error_callbacks << block nil end |
.record_dropped_error_callback ⇒ Object
Record a discarded error callback. Kept cheap – this runs on nats-pure’s read/flush thread, so it must NOT format/log the error synchronously (the whole point of the async path). The atomic counter is the durable signal; the instrument gauge emits a discrete event for dashboards (drops only happen under a severe flood, so a notification per drop is acceptable).
116 117 118 119 120 |
# File 'lib/protobuf/nats.rb', line 116 def self.record_dropped_error_callback ERROR_CALLBACK_DROP_COUNT.increment instrument("error_callback_dropped", 1) nil end |
.start_client_nats_connection ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/protobuf/nats.rb', line 130 def self.start_client_nats_connection return true if @client_nats_connection GET_CONNECTED_MUTEX.synchronize do break true if @client_nats_connection # NOTE: nats-pure has no :disable_reconnect_buffer option (it was a # jnats concept). During a reconnect nats-pure buffers publishes and, # if the connection is fully closed, raises ConnectionClosedError -- # both of which the client's transient-error retry path now handles. = config. client = NatsClient.new # Register lifecycle callbacks BEFORE connecting so a disconnect or # error during the initial handshake is still observed. client.on_disconnect do logger.warn("Client NATS connection was disconnected") end client.on_reconnect do logger.warn("Client NATS connection was reconnected") end client.on_close do logger.warn("Client NATS connection was closed") # A close is terminal for this client object (nats-pure only reconnects # via on_disconnect/on_reconnect; on_close means it gave up). Drop the # memoized reference so the next start_client_nats_connection rebuilds a # fresh connection instead of reusing a permanently-dead one. In-flight # callers keep their own local reference; only new calls rebuild. @client_nats_connection = nil end client.on_error do |error| # Runs on nats-pure's read/flush thread -- offload so a slow callback # can't stall message processing. notify_error_callbacks_async(error) end begin client.connect() # Ensure we have a valid connection to the NATS server. client.flush(5) rescue => e # A failed handshake can leave nats-pure's reader/flusher threads # running on a half-open client; close it so we don't leak them, then # surface the failure (the next call will retry with a fresh client). client.close rescue nil raise e end @client_nats_connection = client true end end |
.subscription_key(service_klass, service_method) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/protobuf/nats.rb', line 122 def self.subscription_key(service_klass, service_method) service_class_name = service_klass.name.underscore.gsub("/", ".") service_method_name = service_method.to_s.underscore subscription_key = "rpc.#{service_class_name}.#{service_method_name}" subscription_key = config.make_subscription_key_replacements(subscription_key) end |