Class: NatsAsync::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/connection.rb

Constant Summary collapse

CR_LF =
"\r\n"
HEADER_LINE =
"NATS/1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, logger:, tls: nil, tls_verify: true, tls_ca_file: nil, tls_ca_path: nil, tls_hostname: nil, tls_handshake_first: false, user: nil, password: nil, nkey_seed: nil, nkey_seed_file: nil, nkey_public_key: nil, on_message: nil, on_error: nil, on_info: nil, flush_delay: 0.01, flush_max_buffer: 5000) ⇒ Connection

Returns a new instance of Connection.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/nats_async/connection.rb', line 24

def initialize(
  url:,
  logger:,
  tls: nil,
  tls_verify: true,
  tls_ca_file: nil,
  tls_ca_path: nil,
  tls_hostname: nil,
  tls_handshake_first: false,
  user: nil,
  password: nil,
  nkey_seed: nil,
  nkey_seed_file: nil,
  nkey_public_key: nil,
  on_message: nil,
  on_error: nil,
  on_info: nil,
  flush_delay: 0.01,
  flush_max_buffer: 5000
)
  @url = URI(url)
  @logger = logger
  @tls_enabled = tls.nil? ? %w[tls nats+tls].include?(@url.scheme) : tls
  @tls_verify = tls_verify
  @tls_ca_file = presence(tls_ca_file)
  @tls_ca_path = presence(tls_ca_path)
  @tls_hostname = presence(tls_hostname)
  @tls_handshake_first = tls_handshake_first
  @auth_user = presence(user || @url.user)
  @auth_password = presence(password || @url.password)
  @nkey_seed = presence(nkey_seed)
  @nkey_seed_file = presence(nkey_seed_file)
  @nkey_public_key = presence(nkey_public_key)
  @on_message = on_message || ->(_message) {}
  @on_error = on_error || ->(_error) {}
  @on_info = on_info || ->(_info) {}

  @flush_delay = flush_delay
  @flush_max_buffer = flush_max_buffer

  @received_pings = 0
  @received_pongs = 0
  @sent_pings = 0
  @server_info = nil
  @last_error = nil
  @stream = nil
  @read_task = nil
  @closed = true
  @write_lock = Async::Semaphore.new(1)
  @pong_condition = Async::Condition.new
  @pending_flush_count = 0
  @last_flush_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

Instance Attribute Details

#last_errorObject (readonly)

Returns the value of attribute last_error.



22
23
24
# File 'lib/nats_async/connection.rb', line 22

def last_error
  @last_error
end

#received_pingsObject (readonly)

Returns the value of attribute received_pings.



22
23
24
# File 'lib/nats_async/connection.rb', line 22

def received_pings
  @received_pings
end

#received_pongsObject (readonly)

Returns the value of attribute received_pongs.



22
23
24
# File 'lib/nats_async/connection.rb', line 22

def received_pongs
  @received_pongs
end

#sent_pingsObject (readonly)

Returns the value of attribute sent_pings.



22
23
24
# File 'lib/nats_async/connection.rb', line 22

def sent_pings
  @sent_pings
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



22
23
24
# File 'lib/nats_async/connection.rb', line 22

def server_info
  @server_info
end

Instance Method Details

#closeObject



93
94
95
96
97
98
99
100
101
102
# File 'lib/nats_async/connection.rb', line 93

def close
  flush_pending if @stream
  @stale_flush_task&.stop
  @read_task&.stop
  safe_close_stream
  @read_task = nil
  @stale_flush_task = nil
  @closed = true
  true
end

#connected?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/nats_async/connection.rb', line 104

def connected?
  !@closed && !@stream.nil?
end

#flush_pendingObject



148
149
150
151
152
153
154
155
# File 'lib/nats_async/connection.rb', line 148

def flush_pending
  return unless @pending_flush_count&.positive?

  @write_lock.acquire do
    stream.write("", flush: true)
    mark_flushed
  end
end

#ping!(timeout: 2) ⇒ Object



108
109
110
111
112
113
114
115
116
117
# File 'lib/nats_async/connection.rb', line 108

def ping!(timeout: 2)
  expected_pongs = @received_pongs + 1

  @sent_pings += 1
  write_line("PING")
  await(timeout: timeout, condition: @pong_condition, timeout_message: "timeout waiting for PONG after #{timeout}s", predicate: lambda {
    raise @last_error if @last_error
    @received_pongs >= expected_pongs
  })
end

#publish(subject, payload = "", reply: nil, headers: nil) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/nats_async/connection.rb', line 119

def publish(subject, payload = "", reply: nil, headers: nil)
  payload = payload.to_s
  return publish_with_headers(subject, payload, headers, reply: reply) if headers && !headers.empty?

  command = build_pub_command(subject, payload.bytesize, reply: reply)
  @logger.debug("C->S #{command}")

  @write_lock.acquire do
    stream.write("#{command}#{CR_LF}", flush: false)
    stream.write(payload, flush: false)
    stream.write(CR_LF, flush: false)
    mark_pending_flush
  end

  flush_pending if @pending_flush_count >= @flush_max_buffer
  protocol_payload_out(payload)
end

#should_flush?Boolean

Returns:

  • (Boolean)


157
158
159
160
161
162
163
# File 'lib/nats_async/connection.rb', line 157

def should_flush?
  return false unless @pending_flush_count&.positive?
  return true if @pending_flush_count >= @flush_max_buffer

  now = monotonic_now
  (now - @last_flush_time) >= @flush_delay
end

#start(task:) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/nats_async/connection.rb', line 78

def start(task:)
  return self if connected?

  connect!
  read_initial_info!
  send_connect!
  @read_task = task.async { read_loop }
  @stale_flush_task = task.async { stale_flush_loop }
  @closed = false
  self
rescue StandardError
  close
  raise
end

#subscribe(subject, sid:, queue: nil) ⇒ Object



137
138
139
140
141
# File 'lib/nats_async/connection.rb', line 137

def subscribe(subject, sid:, queue: nil)
  command = queue ? "SUB #{subject} #{queue} #{sid}" : "SUB #{subject} #{sid}"
  write_line(command)
  sid
end

#unsubscribe(sid) ⇒ Object



143
144
145
146
# File 'lib/nats_async/connection.rb', line 143

def unsubscribe(sid)
  write_line("UNSUB #{sid}") if connected?
  true
end