Class: NatsAsync::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/client.rb

Defined Under Namespace

Classes: RequestPromise

Constant Summary collapse

AckError =
NatsAsync::AckError
ConnectionError =
NatsAsync::ConnectionError
MsgAlreadyAcked =
NatsAsync::MsgAlreadyAcked
NotAckable =
NatsAsync::NotAckable
RequestError =
NatsAsync::RequestError
ProtocolError =
NatsAsync::ProtocolError
Message =
NatsAsync::Message

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: "nats://127.0.0.1:4222", verbose: true, js_api_prefix: "$JS.API", ping_interval: 30, ping_timeout: 5, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, reconnect: false, reconnect_interval: 1, max_reconnect_attempts: nil, flush_delay: 0.01, flush_max_buffer: 5000) ⇒ Client

Returns a new instance of Client.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/nats_async/client.rb', line 94

def initialize(
  url: "nats://127.0.0.1:4222",
  verbose: true,
  js_api_prefix: "$JS.API",
  ping_interval: 30,
  ping_timeout: 5,
  tls: nil,
  tls_verify: true,
  tls_ca_file: nil,
  tls_ca_path: nil,
  tls_hostname: nil,
  tls_handshake_first: false,
  user: nil,
  password: nil,
  nkey_seed: nil,
  nkey_seed_file: nil,
  nkey_public_key: nil,
  reconnect: false,
  reconnect_interval: 1,
  max_reconnect_attempts: nil,
  flush_delay: 0.01,
  flush_max_buffer: 5000
)
  @js_api_prefix = normalize_subject_prefix(js_api_prefix)
  @ping_interval = ping_interval
  @ping_timeout = ping_timeout
  @reconnect = reconnect
  @reconnect_interval = reconnect_interval
  @max_reconnect_attempts = max_reconnect_attempts
  @logger = Console.logger.with(level: (verbose ? :debug : :error), verbose: false)
  @connection_options = {
    url: url,
    logger: @logger,
    tls: tls,
    tls_verify: tls_verify,
    tls_ca_file: tls_ca_file,
    tls_ca_path: tls_ca_path,
    tls_hostname: tls_hostname,
    tls_handshake_first: tls_handshake_first,
    user: user,
    password: password,
    nkey_seed: nkey_seed,
    nkey_seed_file: nkey_seed_file,
    nkey_public_key: nkey_public_key,
    flush_delay: flush_delay,
    flush_max_buffer: flush_max_buffer
  }
  @connection = build_connection

  @ping_task = nil
  @reconnect_task = nil
  @request_timeout_task = nil
  @request_callback_task = nil
  @request_callback_queue = nil
  @read_error = nil
  @started = false
  @closed = true
  @status = :closed
  @sid_seq = 0
  @request_seq = 0
  @subscriptions = {}
  @pending_requests = {}
  @pending_request_condition = Async::Condition.new

  # Response multiplexing state
  @resp_mux_prefix = nil
  @resp_mux_subsid = nil
  @resp_mux_lock = Async::Semaphore.new(1)
  @resp_map = {}
  @resp_map_lock = Async::Semaphore.new(1)
end

Instance Attribute Details

#js_api_prefixObject (readonly)

Returns the value of attribute js_api_prefix.



166
167
168
# File 'lib/nats_async/client.rb', line 166

def js_api_prefix
  @js_api_prefix
end

#statusObject (readonly)

Returns the value of attribute status.



166
167
168
# File 'lib/nats_async/client.rb', line 166

def status
  @status
end

Instance Method Details

#cleanup_resp_muxObject

Clean up the response multiplexing subscription



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/nats_async/client.rb', line 248

def cleanup_resp_mux
  return unless @resp_mux_subsid

  @resp_mux_lock.acquire do
    @subscriptions.delete(@resp_mux_subsid)
    safe_unsubscribe(@resp_mux_subsid)
    @resp_mux_subsid = nil
    @resp_mux_prefix = nil
    @resp_map.clear
  end
end

#closeObject



190
191
192
193
194
195
196
197
198
# File 'lib/nats_async/client.rb', line 190

def close
  return true if closed?

  @closed = true
  stop
  @started = false
  @status = :closed
  true
end

#closed?Boolean

Returns:

  • (Boolean)


269
270
271
# File 'lib/nats_async/client.rb', line 269

def closed?
  @closed
end

#connected?Boolean

Returns:

  • (Boolean)


265
266
267
# File 'lib/nats_async/client.rb', line 265

def connected?
  @status == :connected && @started && !@closed && @connection.connected?
end

#drain(timeout: 5) ⇒ Object



221
222
223
224
225
226
227
228
# File 'lib/nats_async/client.rb', line 221

def drain(timeout: 5)
  @ping_task&.stop
  @ping_task = nil
  flush(timeout: timeout) if connected?
  true
ensure
  close
end

#ensure_resp_mux_sub!(task:) ⇒ Object

Ensure the response multiplexing subscription is set up Creates a single wildcard subscription to handle all request responses



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/nats_async/client.rb', line 299

def ensure_resp_mux_sub!(task:)
  return if @resp_mux_prefix

  @resp_mux_lock.acquire do
    return if @resp_mux_prefix

    # Generate unique prefix for this connection
    @resp_mux_prefix = "_INBOX.#{SecureRandom.hex(8)}"

    # Create wildcard subscription to handle all responses
    # Messages for request replies come with subject = the reply inbox
    @resp_mux_subsid = subscribe("#{@resp_mux_prefix}.*") do |msg|
      # Extract token from subject (format: _INBOX.<prefix>.<token>)
      token = msg.subject.split(".").last
      process_mux_response(token, msg)
    end
  end

  # Flush immediately to ensure subscription is registered before any requests
  @connection.flush_pending
  # The flush is async, so we just wait a tiny bit to ensure it's sent
  sleep(0.001)
end

#flush(timeout: 2) ⇒ Object



260
261
262
263
# File 'lib/nats_async/client.rb', line 260

def flush(timeout: 2)
  ping!(timeout: timeout)
  true
end

#generate_resp_tokenObject

Generate a unique token for request tracking



340
341
342
# File 'lib/nats_async/client.rb', line 340

def generate_resp_token
  SecureRandom.hex(8)
end

#jetstreamObject



414
415
416
# File 'lib/nats_async/client.rb', line 414

def jetstream
  @jetstream ||= JetStream.new(self)
end

#js_api_subject(*tokens) ⇒ Object



410
411
412
# File 'lib/nats_async/client.rb', line 410

def js_api_subject(*tokens)
  [js_api_prefix, *tokens.flatten].compact.map(&:to_s).reject(&:empty?).join(".")
end

#last_errorObject



273
274
275
# File 'lib/nats_async/client.rb', line 273

def last_error
  @read_error || @connection.last_error
end

#ping!(timeout: 2) ⇒ Object



277
278
279
# File 'lib/nats_async/client.rb', line 277

def ping!(timeout: 2)
  @connection.ping!(timeout: timeout)
end

#process_mux_response(token, message) ⇒ Object

Process a response message from the multiplexed subscription



324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/nats_async/client.rb', line 324

def process_mux_response(token, message)
  @resp_map_lock.acquire do
    pending = @resp_map.delete(token)
    return unless pending

    if pending[:callback]
      enqueue_request_callback(pending, message)
    else
      pending[:promise].fulfill(message)
    end
  rescue StandardError
    @logger.error("mux response handler error")
  end
end

#publish(subject, payload = "", reply: nil, headers: nil) ⇒ Object



281
282
283
284
# File 'lib/nats_async/client.rb', line 281

def publish(subject, payload = "", reply: nil, headers: nil)
  ensure_connected!
  @connection.publish(subject, payload, reply: reply, headers: headers)
end

#received_pingsObject



350
# File 'lib/nats_async/client.rb', line 350

def received_pings = @connection.received_pings

#received_pongsObject



352
# File 'lib/nats_async/client.rb', line 352

def received_pongs = @connection.received_pongs

#request(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/nats_async/client.rb', line 358

def request(subject, payload = "", timeout: 0.5, headers: nil, &block)
  ensure_connected!

  # If block is provided, use old-style per-request subscription
  if block
    return request_old_style(subject, payload, timeout: timeout, headers: headers, &block)
  end

  # Use multiplexed subscription for promise-based requests
  ensure_resp_mux_sub!(task: @reactor_task)
  token = generate_resp_token
  inbox = "#{@resp_mux_prefix}.#{token}"

  promise = RequestPromise.new(id: token)

  begin
    # Track request with token-based lookup in mux response handler
    @resp_map_lock.acquire do
      @resp_map[token] = {
        promise: promise,
        deadline: monotonic_now + timeout,
        timeout: timeout,
        callback: nil
      }
    end
    @pending_request_condition.signal

    publish(subject, request_payload(payload), reply: inbox, headers: headers)
    promise
  rescue StandardError
    @resp_map_lock.acquire do
      @resp_map.delete(token)
    end
    raise
  end
end

#request_old_style(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object



395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/nats_async/client.rb', line 395

def request_old_style(subject, payload = "", timeout: 0.5, headers: nil, &block)
  # Old-style request with per-request subscription (for callback support)
  request_id = next_request_id
  promise = RequestPromise.new(id: request_id)
  inbox = request_inbox(request_id)
  sid = nil

  sid = subscribe(inbox, handler: lambda { |message| complete_request(request_id, message) })
  track_request(request_id, subject: subject, sid: sid, promise: promise, timeout: timeout, callback: block)
  publish(subject, request_payload(payload), reply: inbox, headers: headers)
  promise
rescue StandardError => e
  reject_request_setup(request_id: request_id, promise: promise, sid: sid, error: e)
end

#resolve_backend(mode: :auto, stream: nil) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/nats_async/client.rb', line 200

def resolve_backend(mode: :auto, stream: nil)
  mode = mode.to_sym
  return :core if mode == :core

  raise ArgumentError, "stream is required for #{mode} backend" if stream.to_s.empty?

  case mode
  when :jetstream
    jetstream.stream_info(stream)
    :jetstream
  when :auto
    jetstream.stream_exists?(stream) ? :jetstream : :core
  else
    raise ArgumentError, "unsupported backend mode: #{mode.inspect}"
  end
rescue JetStream::Error
  raise if mode == :jetstream

  :core
end

#sent_pingsObject



354
# File 'lib/nats_async/client.rb', line 354

def sent_pings = @connection.sent_pings

#server_infoObject



356
# File 'lib/nats_async/client.rb', line 356

def server_info = @connection.server_info

#start(task:) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/nats_async/client.rb', line 168

def start(task:)
  return self if connected?

  @reactor_task = task
  @status = :connecting
  @connection.start(task: task)
  @started = true
  @closed = false
  ping!(timeout: @ping_timeout)
  @status = :connected
  start_ping_loop(task)
  start_request_timeout_loop(task)
  start_request_callback_loop(task)
  self
rescue StandardError
  stop
  @started = false
  @closed = true
  @status = :closed
  raise
end

#stopObject



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/nats_async/client.rb', line 230

def stop
  @request_timeout_task&.stop
  close_error = IOError.new("client closed")
  reject_pending_requests(close_error)
  reject_queued_request_callbacks(close_error)
  @request_callback_task&.stop
  @ping_task&.stop
  @reconnect_task&.stop
  cleanup_resp_mux
  @connection.close
  @request_timeout_task = nil
  @request_callback_task = nil
  @request_callback_queue = nil
  @ping_task = nil
  @reconnect_task = nil
end

#subscribe(subject, queue: nil, handler: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


286
287
288
289
290
291
292
293
294
295
# File 'lib/nats_async/client.rb', line 286

def subscribe(subject, queue: nil, handler: nil, &block)
  ensure_open!
  callback = handler || block
  raise ArgumentError, "handler or block required for subscribe" unless callback

  sid = next_sid
  @subscriptions[sid] = {subject: subject, queue: queue, callback: callback}
  @connection.subscribe(subject, sid: sid, queue: queue) if connected?
  sid
end

#unsubscribe(sid) ⇒ Object



344
345
346
347
348
# File 'lib/nats_async/client.rb', line 344

def unsubscribe(sid)
  @subscriptions.delete(sid)
  @connection.unsubscribe(sid)
  true
end