Class: NatsAsync::Client
- Inherits:
-
Object
- Object
- NatsAsync::Client
- Defined in:
- lib/nats_async/client.rb
Defined Under Namespace
Classes: RequestPromise
Constant Summary collapse
- AckError =
NatsAsync::AckError
- ConnectionError =
NatsAsync::ConnectionError
- MsgAlreadyAcked =
NatsAsync::MsgAlreadyAcked
- NotAckable =
NatsAsync::NotAckable
- RequestError =
NatsAsync::RequestError
- ProtocolError =
NatsAsync::ProtocolError
- Message =
NatsAsync::Message
Instance Attribute Summary collapse
-
#js_api_prefix ⇒ Object
readonly
Returns the value of attribute js_api_prefix.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
Instance Method Summary collapse
-
#cleanup_resp_mux ⇒ Object
Clean up the response multiplexing subscription.
- #close ⇒ Object
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #drain(timeout: 5) ⇒ Object
-
#ensure_resp_mux_sub!(task:) ⇒ Object
Ensure the response multiplexing subscription is set up Creates a single wildcard subscription to handle all request responses.
- #flush(timeout: 2) ⇒ Object
-
#generate_resp_token ⇒ Object
Generate a unique token for request tracking.
-
#initialize(url: "nats://127.0.0.1:4222", verbose: true, js_api_prefix: "$JS.API", ping_interval: 30, ping_timeout: 5, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, reconnect: false, reconnect_interval: 1, max_reconnect_attempts: nil, flush_delay: 0.01, flush_max_buffer: 5000) ⇒ Client
constructor
A new instance of Client.
- #jetstream ⇒ Object
- #js_api_subject(*tokens) ⇒ Object
- #last_error ⇒ Object
- #ping!(timeout: 2) ⇒ Object
-
#process_mux_response(token, message) ⇒ Object
Process a response message from the multiplexed subscription.
- #publish(subject, payload = "", reply: nil, headers: nil) ⇒ Object
- #received_pings ⇒ Object
- #received_pongs ⇒ Object
- #request(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object
- #request_old_style(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object
- #resolve_backend(mode: :auto, stream: nil) ⇒ Object
- #sent_pings ⇒ Object
- #server_info ⇒ Object
- #start(task:) ⇒ Object
- #stop ⇒ Object
- #subscribe(subject, queue: nil, handler: nil, &block) ⇒ Object
- #unsubscribe(sid) ⇒ Object
Constructor Details
#initialize(url: "nats://127.0.0.1:4222", verbose: true, js_api_prefix: "$JS.API", ping_interval: 30, ping_timeout: 5, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, reconnect: false, reconnect_interval: 1, max_reconnect_attempts: nil, flush_delay: 0.01, flush_max_buffer: 5000) ⇒ Client
Returns a new instance of Client.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/nats_async/client.rb', line 94 def initialize( url: "nats://127.0.0.1:4222", verbose: true, js_api_prefix: "$JS.API", ping_interval: 30, ping_timeout: 5, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, reconnect: false, reconnect_interval: 1, max_reconnect_attempts: nil, flush_delay: 0.01, flush_max_buffer: 5000 ) @js_api_prefix = normalize_subject_prefix(js_api_prefix) @ping_interval = ping_interval @ping_timeout = ping_timeout @reconnect = reconnect @reconnect_interval = reconnect_interval @max_reconnect_attempts = max_reconnect_attempts @logger = Console.logger.with(level: (verbose ? :debug : :error), verbose: false) @connection_options = { url: url, logger: @logger, tls: tls, tls_verify: tls_verify, tls_ca_file: tls_ca_file, tls_ca_path: tls_ca_path, tls_hostname: tls_hostname, tls_handshake_first: tls_handshake_first, user: user, password: password, nkey_seed: nkey_seed, nkey_seed_file: nkey_seed_file, nkey_public_key: nkey_public_key, flush_delay: flush_delay, flush_max_buffer: flush_max_buffer } @connection = build_connection @ping_task = nil @reconnect_task = nil @request_timeout_task = nil @request_callback_task = nil @request_callback_queue = nil @read_error = nil @started = false @closed = true @status = :closed @sid_seq = 0 @request_seq = 0 @subscriptions = {} @pending_requests = {} @pending_request_condition = Async::Condition.new # Response multiplexing state @resp_mux_prefix = nil @resp_mux_subsid = nil @resp_mux_lock = Async::Semaphore.new(1) @resp_map = {} @resp_map_lock = Async::Semaphore.new(1) end |
Instance Attribute Details
#js_api_prefix ⇒ Object (readonly)
Returns the value of attribute js_api_prefix.
166 167 168 |
# File 'lib/nats_async/client.rb', line 166 def js_api_prefix @js_api_prefix end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
166 167 168 |
# File 'lib/nats_async/client.rb', line 166 def status @status end |
Instance Method Details
#cleanup_resp_mux ⇒ Object
Clean up the response multiplexing subscription
248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/nats_async/client.rb', line 248 def cleanup_resp_mux return unless @resp_mux_subsid @resp_mux_lock.acquire do @subscriptions.delete(@resp_mux_subsid) safe_unsubscribe(@resp_mux_subsid) @resp_mux_subsid = nil @resp_mux_prefix = nil @resp_map.clear end end |
#close ⇒ Object
190 191 192 193 194 195 196 197 198 |
# File 'lib/nats_async/client.rb', line 190 def close return true if closed? @closed = true stop @started = false @status = :closed true end |
#closed? ⇒ Boolean
269 270 271 |
# File 'lib/nats_async/client.rb', line 269 def closed? @closed end |
#connected? ⇒ Boolean
265 266 267 |
# File 'lib/nats_async/client.rb', line 265 def connected? @status == :connected && @started && !@closed && @connection.connected? end |
#drain(timeout: 5) ⇒ Object
221 222 223 224 225 226 227 228 |
# File 'lib/nats_async/client.rb', line 221 def drain(timeout: 5) @ping_task&.stop @ping_task = nil flush(timeout: timeout) if connected? true ensure close end |
#ensure_resp_mux_sub!(task:) ⇒ Object
Ensure the response multiplexing subscription is set up Creates a single wildcard subscription to handle all request responses
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/nats_async/client.rb', line 299 def ensure_resp_mux_sub!(task:) return if @resp_mux_prefix @resp_mux_lock.acquire do return if @resp_mux_prefix # Generate unique prefix for this connection @resp_mux_prefix = "_INBOX.#{SecureRandom.hex(8)}" # Create wildcard subscription to handle all responses # Messages for request replies come with subject = the reply inbox @resp_mux_subsid = subscribe("#{@resp_mux_prefix}.*") do |msg| # Extract token from subject (format: _INBOX.<prefix>.<token>) token = msg.subject.split(".").last process_mux_response(token, msg) end end # Flush immediately to ensure subscription is registered before any requests @connection.flush_pending # The flush is async, so we just wait a tiny bit to ensure it's sent sleep(0.001) end |
#flush(timeout: 2) ⇒ Object
260 261 262 263 |
# File 'lib/nats_async/client.rb', line 260 def flush(timeout: 2) ping!(timeout: timeout) true end |
#generate_resp_token ⇒ Object
Generate a unique token for request tracking
340 341 342 |
# File 'lib/nats_async/client.rb', line 340 def generate_resp_token SecureRandom.hex(8) end |
#jetstream ⇒ Object
414 415 416 |
# File 'lib/nats_async/client.rb', line 414 def jetstream @jetstream ||= JetStream.new(self) end |
#js_api_subject(*tokens) ⇒ Object
410 411 412 |
# File 'lib/nats_async/client.rb', line 410 def js_api_subject(*tokens) [js_api_prefix, *tokens.flatten].compact.map(&:to_s).reject(&:empty?).join(".") end |
#last_error ⇒ Object
273 274 275 |
# File 'lib/nats_async/client.rb', line 273 def last_error @read_error || @connection.last_error end |
#ping!(timeout: 2) ⇒ Object
277 278 279 |
# File 'lib/nats_async/client.rb', line 277 def ping!(timeout: 2) @connection.ping!(timeout: timeout) end |
#process_mux_response(token, message) ⇒ Object
Process a response message from the multiplexed subscription
324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/nats_async/client.rb', line 324 def process_mux_response(token, ) @resp_map_lock.acquire do pending = @resp_map.delete(token) return unless pending if pending[:callback] enqueue_request_callback(pending, ) else pending[:promise].fulfill() end rescue StandardError @logger.error("mux response handler error") end end |
#publish(subject, payload = "", reply: nil, headers: nil) ⇒ Object
281 282 283 284 |
# File 'lib/nats_async/client.rb', line 281 def publish(subject, payload = "", reply: nil, headers: nil) ensure_connected! @connection.publish(subject, payload, reply: reply, headers: headers) end |
#received_pings ⇒ Object
350 |
# File 'lib/nats_async/client.rb', line 350 def received_pings = @connection.received_pings |
#received_pongs ⇒ Object
352 |
# File 'lib/nats_async/client.rb', line 352 def received_pongs = @connection.received_pongs |
#request(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/nats_async/client.rb', line 358 def request(subject, payload = "", timeout: 0.5, headers: nil, &block) ensure_connected! # If block is provided, use old-style per-request subscription if block return request_old_style(subject, payload, timeout: timeout, headers: headers, &block) end # Use multiplexed subscription for promise-based requests ensure_resp_mux_sub!(task: @reactor_task) token = generate_resp_token inbox = "#{@resp_mux_prefix}.#{token}" promise = RequestPromise.new(id: token) begin # Track request with token-based lookup in mux response handler @resp_map_lock.acquire do @resp_map[token] = { promise: promise, deadline: monotonic_now + timeout, timeout: timeout, callback: nil } end @pending_request_condition.signal publish(subject, request_payload(payload), reply: inbox, headers: headers) promise rescue StandardError @resp_map_lock.acquire do @resp_map.delete(token) end raise end end |
#request_old_style(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object
395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'lib/nats_async/client.rb', line 395 def request_old_style(subject, payload = "", timeout: 0.5, headers: nil, &block) # Old-style request with per-request subscription (for callback support) request_id = next_request_id promise = RequestPromise.new(id: request_id) inbox = request_inbox(request_id) sid = nil sid = subscribe(inbox, handler: lambda { || complete_request(request_id, ) }) track_request(request_id, subject: subject, sid: sid, promise: promise, timeout: timeout, callback: block) publish(subject, request_payload(payload), reply: inbox, headers: headers) promise rescue StandardError => e reject_request_setup(request_id: request_id, promise: promise, sid: sid, error: e) end |
#resolve_backend(mode: :auto, stream: nil) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/nats_async/client.rb', line 200 def resolve_backend(mode: :auto, stream: nil) mode = mode.to_sym return :core if mode == :core raise ArgumentError, "stream is required for #{mode} backend" if stream.to_s.empty? case mode when :jetstream jetstream.stream_info(stream) :jetstream when :auto jetstream.stream_exists?(stream) ? :jetstream : :core else raise ArgumentError, "unsupported backend mode: #{mode.inspect}" end rescue JetStream::Error raise if mode == :jetstream :core end |
#sent_pings ⇒ Object
354 |
# File 'lib/nats_async/client.rb', line 354 def sent_pings = @connection.sent_pings |
#server_info ⇒ Object
356 |
# File 'lib/nats_async/client.rb', line 356 def server_info = @connection.server_info |
#start(task:) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/nats_async/client.rb', line 168 def start(task:) return self if connected? @reactor_task = task @status = :connecting @connection.start(task: task) @started = true @closed = false ping!(timeout: @ping_timeout) @status = :connected start_ping_loop(task) start_request_timeout_loop(task) start_request_callback_loop(task) self rescue StandardError stop @started = false @closed = true @status = :closed raise end |
#stop ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/nats_async/client.rb', line 230 def stop @request_timeout_task&.stop close_error = IOError.new("client closed") reject_pending_requests(close_error) reject_queued_request_callbacks(close_error) @request_callback_task&.stop @ping_task&.stop @reconnect_task&.stop cleanup_resp_mux @connection.close @request_timeout_task = nil @request_callback_task = nil @request_callback_queue = nil @ping_task = nil @reconnect_task = nil end |
#subscribe(subject, queue: nil, handler: nil, &block) ⇒ Object
286 287 288 289 290 291 292 293 294 295 |
# File 'lib/nats_async/client.rb', line 286 def subscribe(subject, queue: nil, handler: nil, &block) ensure_open! callback = handler || block raise ArgumentError, "handler or block required for subscribe" unless callback sid = next_sid @subscriptions[sid] = {subject: subject, queue: queue, callback: callback} @connection.subscribe(subject, sid: sid, queue: queue) if connected? sid end |
#unsubscribe(sid) ⇒ Object
344 345 346 347 348 |
# File 'lib/nats_async/client.rb', line 344 def unsubscribe(sid) @subscriptions.delete(sid) @connection.unsubscribe(sid) true end |