Module: Protobuf::Nats
- Defined in:
- lib/protobuf/nats.rb,
lib/protobuf/nats/client.rb,
lib/protobuf/nats/config.rb,
lib/protobuf/nats/errors.rb,
lib/protobuf/nats/runner.rb,
lib/protobuf/nats/server.rb,
lib/protobuf/nats/version.rb,
lib/protobuf/nats/thread_pool.rb,
lib/protobuf/nats/uuidv7_helper.rb,
lib/protobuf/nats/response_muxer.rb,
lib/protobuf/nats/response_muxer_request.rb,
lib/protobuf/nats/super_subscription_manager.rb
Defined Under Namespace
Modules: Errors, Messages
Classes: Client, Config, ResponseMuxer, ResponseMuxerRequest, Runner, Server, SuperSubscriptionManager, ThreadPool, UUIDv7Helper
Constant Summary
collapse
- NatsClient =
::NATS::IO::Client
- GET_CONNECTED_MUTEX =
::Mutex.new
- ERROR_CALLBACK_EXECUTOR =
Bounded, single-thread executor for running error callbacks OFF hot/shared threads (notably nats-pure’s read/flush thread via on_error). A slow user callback must not stall message processing for every subject. The queue is bounded and over-capacity notifications are discarded (they’re advisory).
::Concurrent::ThreadPoolExecutor.new(
:min_threads => 0,
:max_threads => 1,
:max_queue => 1024,
:fallback_policy => :discard
)
- VERSION =
"0.13.1.pre1"
Class Attribute Summary collapse
Class Method Summary
collapse
Class Attribute Details
.client_nats_connection ⇒ Object
Returns the value of attribute client_nats_connection.
25
26
27
|
# File 'lib/protobuf/nats.rb', line 25
def client_nats_connection
@client_nats_connection
end
|
Class Method Details
.config ⇒ Object
37
38
39
40
41
42
43
|
# File 'lib/protobuf/nats.rb', line 37
def self.config
@config ||= begin
config = ::Protobuf::Nats::Config.new
config.load_from_yml
config
end
end
|
.crash_backoff_seconds(crash_count, cap = 60) ⇒ Object
Exponential backoff (seconds) for self-healing worker threads after a fatal crash, capped. Shared by the ResponseMuxer dispatcher pool and the server SuperSubscriptionManager handler pool so the formula can’t drift between them.
169
170
171
|
# File 'lib/protobuf/nats.rb', line 169
def self.crash_backoff_seconds(crash_count, cap = 60)
[(crash_count**2), cap].min
end
|
.error_callbacks ⇒ Object
We will always log an error.
49
50
51
|
# File 'lib/protobuf/nats.rb', line 49
def self.error_callbacks
@error_callbacks ||= [lambda { |error| log_error(error) }]
end
|
.instrument(event, payload = {}, &block) ⇒ Object
Single instrumentation entry point. Appends the gem’s ‘.protobuf-nats` suffix so callers don’t repeat it (and can’t typo it). Supports both the value form ‘instrument(“server.x”, 5)` and the block form `instrument(“client.request_duration”) { … }`.
66
67
68
|
# File 'lib/protobuf/nats.rb', line 66
def self.instrument(event, payload = {}, &block)
::ActiveSupport::Notifications.instrument("#{event}.protobuf-nats", payload, &block)
end
|
.log_error(error) ⇒ Object
173
174
175
176
177
178
179
|
# File 'lib/protobuf/nats.rb', line 173
def self.log_error(error)
logger.error error.to_s
logger.error error.class.to_s
if error.respond_to?(:backtrace) && error.backtrace.is_a?(::Array)
logger.error error.backtrace.join("\n")
end
end
|
.logger ⇒ Object
181
182
183
|
# File 'lib/protobuf/nats.rb', line 181
def self.logger
::Protobuf::Logging.logger
end
|
.monotonic_time ⇒ Object
Monotonic clock for durations/ages; immune to wall-clock (NTP) jumps. Single source of truth shared by the client muxer and server pools.
162
163
164
|
# File 'lib/protobuf/nats.rb', line 162
def self.monotonic_time
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
|
.notify_error_callbacks(error) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/protobuf/nats.rb', line 70
def self.notify_error_callbacks(error)
error_callbacks.each do |callback|
begin
callback.call(error)
rescue => callback_error
log_error(callback_error)
end
end
nil
end
|
.notify_error_callbacks_async(error) ⇒ Object
93
94
95
96
97
98
|
# File 'lib/protobuf/nats.rb', line 93
def self.notify_error_callbacks_async(error)
ERROR_CALLBACK_EXECUTOR.post { notify_error_callbacks(error) }
nil
rescue ::Concurrent::RejectedExecutionError
nil
end
|
.on_error(&block) ⇒ Object
56
57
58
59
60
|
# File 'lib/protobuf/nats.rb', line 56
def self.on_error(&block)
fail ::ArgumentError unless block.arity == 1
error_callbacks << block
nil
end
|
.start_client_nats_connection ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/protobuf/nats.rb', line 108
def self.start_client_nats_connection
return true if @client_nats_connection
GET_CONNECTED_MUTEX.synchronize do
break true if @client_nats_connection
options = config.connection_options
client = NatsClient.new
client.on_disconnect do
logger.warn("Client NATS connection was disconnected")
end
client.on_reconnect do
logger.warn("Client NATS connection was reconnected")
end
client.on_close do
logger.warn("Client NATS connection was closed")
end
client.on_error do |error|
notify_error_callbacks_async(error)
end
begin
client.connect(options)
client.flush(5)
rescue => e
client.close rescue nil
raise e
end
@client_nats_connection = client
true
end
end
|
.subscription_key(service_klass, service_method) ⇒ Object
100
101
102
103
104
105
106
|
# File 'lib/protobuf/nats.rb', line 100
def self.subscription_key(service_klass, service_method)
service_class_name = service_klass.name.underscore.gsub("/", ".")
service_method_name = service_method.to_s.underscore
subscription_key = "rpc.#{service_class_name}.#{service_method_name}"
subscription_key = config.make_subscription_key_replacements(subscription_key)
end
|