Class: NatsAsync::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/client.rb

Defined Under Namespace

Classes: RequestPromise

Constant Summary collapse

AckError =
NatsAsync::AckError
ConnectionError =
NatsAsync::ConnectionError
MsgAlreadyAcked =
NatsAsync::MsgAlreadyAcked
NotAckable =
NatsAsync::NotAckable
RequestError =
NatsAsync::RequestError
ProtocolError =
NatsAsync::ProtocolError
Message =
NatsAsync::Message

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: "nats://127.0.0.1:4222", verbose: true, js_api_prefix: "$JS.API", ping_interval: 30, ping_timeout: 5, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, reconnect: false, reconnect_interval: 1, max_reconnect_attempts: nil, flush_delay: 0.01, flush_max_buffer: 5000) ⇒ Client

Returns a new instance of Client.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/nats_async/client.rb', line 93

def initialize(
  url: "nats://127.0.0.1:4222",
  verbose: true,
  js_api_prefix: "$JS.API",
  ping_interval: 30,
  ping_timeout: 5,
  tls: nil,
  tls_verify: true,
  tls_ca_file: nil,
  tls_ca_path: nil,
  tls_hostname: nil,
  tls_handshake_first: false,
  user: nil,
  password: nil,
  nkey_seed: nil,
  nkey_seed_file: nil,
  nkey_public_key: nil,
  reconnect: false,
  reconnect_interval: 1,
  max_reconnect_attempts: nil,
  flush_delay: 0.01,
  flush_max_buffer: 5000
)
  @js_api_prefix = normalize_subject_prefix(js_api_prefix)
  @ping_interval = ping_interval
  @ping_timeout = ping_timeout
  @reconnect = reconnect
  @reconnect_interval = reconnect_interval
  @max_reconnect_attempts = max_reconnect_attempts
  @logger = Console.logger.with(level: (verbose ? :debug : :error), verbose: false)
  @connection_options = {
    url: url,
    logger: @logger,
    tls: tls,
    tls_verify: tls_verify,
    tls_ca_file: tls_ca_file,
    tls_ca_path: tls_ca_path,
    tls_hostname: tls_hostname,
    tls_handshake_first: tls_handshake_first,
    user: user,
    password: password,
    nkey_seed: nkey_seed,
    nkey_seed_file: nkey_seed_file,
    nkey_public_key: nkey_public_key,
    flush_delay: flush_delay,
    flush_max_buffer: flush_max_buffer
  }
  @connection = build_connection

  @ping_task = nil
  @reconnect_task = nil
  @request_timeout_task = nil
  @request_callback_task = nil
  @request_callback_queue = nil
  @read_error = nil
  @started = false
  @closed = true
  @status = :closed
  @sid_seq = 0
  @request_seq = 0
  @subscriptions = {}
  @pending_requests = {}
  @pending_request_condition = Async::Condition.new
end

Instance Attribute Details

#js_api_prefixObject (readonly)

Returns the value of attribute js_api_prefix.



158
159
160
# File 'lib/nats_async/client.rb', line 158

def js_api_prefix
  @js_api_prefix
end

#statusObject (readonly)

Returns the value of attribute status.



158
159
160
# File 'lib/nats_async/client.rb', line 158

def status
  @status
end

Instance Method Details

#closeObject



182
183
184
185
186
187
188
189
190
# File 'lib/nats_async/client.rb', line 182

def close
  return true if closed?

  @closed = true
  stop
  @started = false
  @status = :closed
  true
end

#closed?Boolean

Returns:

  • (Boolean)


247
248
249
# File 'lib/nats_async/client.rb', line 247

def closed?
  @closed
end

#connected?Boolean

Returns:

  • (Boolean)


243
244
245
# File 'lib/nats_async/client.rb', line 243

def connected?
  @status == :connected && @started && !@closed && @connection.connected?
end

#drain(timeout: 5) ⇒ Object



213
214
215
216
217
218
219
220
# File 'lib/nats_async/client.rb', line 213

def drain(timeout: 5)
  @ping_task&.stop
  @ping_task = nil
  flush(timeout: timeout) if connected?
  true
ensure
  close
end

#flush(timeout: 2) ⇒ Object



238
239
240
241
# File 'lib/nats_async/client.rb', line 238

def flush(timeout: 2)
  ping!(timeout: timeout)
  true
end

#jetstreamObject



308
309
310
# File 'lib/nats_async/client.rb', line 308

def jetstream
  @jetstream ||= JetStream.new(self)
end

#js_api_subject(*tokens) ⇒ Object



304
305
306
# File 'lib/nats_async/client.rb', line 304

def js_api_subject(*tokens)
  [js_api_prefix, *tokens.flatten].compact.map(&:to_s).reject(&:empty?).join(".")
end

#last_errorObject



251
252
253
# File 'lib/nats_async/client.rb', line 251

def last_error
  @read_error || @connection.last_error
end

#ping!(timeout: 2) ⇒ Object



255
256
257
# File 'lib/nats_async/client.rb', line 255

def ping!(timeout: 2)
  @connection.ping!(timeout: timeout)
end

#publish(subject, payload = "", reply: nil, headers: nil) ⇒ Object



259
260
261
262
# File 'lib/nats_async/client.rb', line 259

def publish(subject, payload = "", reply: nil, headers: nil)
  ensure_connected!
  @connection.publish(subject, payload, reply: reply, headers: headers)
end

#received_pingsObject



281
# File 'lib/nats_async/client.rb', line 281

def received_pings = @connection.received_pings

#received_pongsObject



283
# File 'lib/nats_async/client.rb', line 283

def received_pongs = @connection.received_pongs

#request(subject, payload = "", timeout: 0.5, headers: nil, &block) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/nats_async/client.rb', line 289

def request(subject, payload = "", timeout: 0.5, headers: nil, &block)
  ensure_connected!
  request_id = next_request_id
  promise = RequestPromise.new(id: request_id)
  inbox = request_inbox(request_id)
  sid = nil

  sid = subscribe(inbox, handler: lambda { |message| complete_request(request_id, message) })
  track_request(request_id, subject: subject, sid: sid, promise: promise, timeout: timeout, callback: callback)
  publish(subject, request_payload(payload), reply: inbox, headers: headers)
  promise
rescue StandardError => e
  reject_request_setup(request_id: request_id, promise: promise, sid: sid, error: e)
end

#resolve_backend(mode: :auto, stream: nil) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/nats_async/client.rb', line 192

def resolve_backend(mode: :auto, stream: nil)
  mode = mode.to_sym
  return :core if mode == :core

  raise ArgumentError, "stream is required for #{mode} backend" if stream.to_s.empty?

  case mode
  when :jetstream
    jetstream.stream_info(stream)
    :jetstream
  when :auto
    jetstream.stream_exists?(stream) ? :jetstream : :core
  else
    raise ArgumentError, "unsupported backend mode: #{mode.inspect}"
  end
rescue JetStream::Error
  raise if mode == :jetstream

  :core
end

#sent_pingsObject



285
# File 'lib/nats_async/client.rb', line 285

def sent_pings = @connection.sent_pings

#server_infoObject



287
# File 'lib/nats_async/client.rb', line 287

def server_info = @connection.server_info

#start(task:) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/nats_async/client.rb', line 160

def start(task:)
  return self if connected?

  @reactor_task = task
  @status = :connecting
  @connection.start(task: task)
  @started = true
  @closed = false
  ping!(timeout: @ping_timeout)
  @status = :connected
  start_ping_loop(task)
  start_request_timeout_loop(task)
  start_request_callback_loop(task)
  self
rescue StandardError
  stop
  @started = false
  @closed = true
  @status = :closed
  raise
end

#stopObject



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/nats_async/client.rb', line 222

def stop
  @request_timeout_task&.stop
  close_error = IOError.new("client closed")
  reject_pending_requests(close_error)
  reject_queued_request_callbacks(close_error)
  @request_callback_task&.stop
  @ping_task&.stop
  @reconnect_task&.stop
  @connection.close
  @request_timeout_task = nil
  @request_callback_task = nil
  @request_callback_queue = nil
  @ping_task = nil
  @reconnect_task = nil
end

#subscribe(subject, queue: nil, handler: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


264
265
266
267
268
269
270
271
272
273
# File 'lib/nats_async/client.rb', line 264

def subscribe(subject, queue: nil, handler: nil, &block)
  ensure_open!
  callback = handler || block
  raise ArgumentError, "handler or block required for subscribe" unless callback

  sid = next_sid
  @subscriptions[sid] = {subject: subject, queue: queue, callback: callback}
  @connection.subscribe(subject, sid: sid, queue: queue) if connected?
  sid
end

#unsubscribe(sid) ⇒ Object



275
276
277
278
279
# File 'lib/nats_async/client.rb', line 275

def unsubscribe(sid)
  @subscriptions.delete(sid)
  @connection.unsubscribe(sid)
  true
end