Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Callbacks collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, name: nil, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • codec_registry (MessageCodecRegistry) (defaults to: nil)

    Registry for message codecs

  • strict_coding (Boolean) (defaults to: false)

    Whether to raise errors on unsupported codecs

  • name (String, nil) (defaults to: nil)

    Instance identifier embedded in thread names (e.g. “amqp.read_loop host:port”) and lifecycle log prefixes. Usually sourced from the URL’s ‘?name=` query param by AMQP::Client.

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Float) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/amqp/client/connection.rb', line 34

def initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, name: nil, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  @host = host
  @port = port
  @name = name

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @codec_registry = codec_registry
  @strict_coding = strict_coding
  @channels = {}
  @channels_lock = Mutex.new
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
  @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }

  # Only used with heartbeats
  @last_activity_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  return unless read_loop_thread

  t = Thread.new { read_loop }
  t.name = thread_name(role: "read_loop")
end

Instance Attribute Details

#codec_registryMessageCodecRegistry? (readonly)

The codec registry for message encoding/decoding

Returns:



108
109
110
# File 'lib/amqp/client/connection.rb', line 108

def codec_registry
  @codec_registry
end

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


104
105
106
# File 'lib/amqp/client/connection.rb', line 104

def frame_max
  @frame_max
end

#strict_codingBoolean (readonly)

Whether to use strict coding (raise errors on unsupported codecs)

Returns:

  • (Boolean)


112
113
114
# File 'lib/amqp/client/connection.rb', line 112

def strict_coding
  @strict_coding
end

Class Method Details

.connect(uri, read_loop_thread: true) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



98
99
100
# File 'lib/amqp/client/connection.rb', line 98

def self.connect(uri, read_loop_thread: true, **)
  new(uri, read_loop_thread:, **)
end

Instance Method Details

#blocked?Bool

Indicates that the server is blocking publishes. If the client keeps publishing the server will stop reading from the socket. Use the #on_blocked callback to get notified when the server is resource constrained.

Returns:

  • (Bool)

See Also:



91
92
93
# File 'lib/amqp/client/connection.rb', line 91

def blocked?
  !@blocked.nil?
end

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/amqp/client/connection.rb', line 124

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?

  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  ch = @channels_lock.synchronize do
    if id
      @channels[id] ||= Channel.new(self, id)
    else
      1.upto(@channel_max) do |i|
        break id = i unless @channels.key? i
      end
      raise Error, "Max channels reached" if id.nil?

      @channels[id] = Channel.new(self, id)
    end
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/amqp/client/connection.rb', line 160

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


186
187
188
# File 'lib/amqp/client/connection.rb', line 186

def closed?
  !@closed.nil?
end

#inspectString

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Custom inspect

Returns:

  • (String)


117
118
119
# File 'lib/amqp/client/connection.rb', line 117

def inspect
  "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>"
end

#on_blocked {|String| ... } ⇒ nil

Callback called when client is blocked by the broker

Yields:

  • (String)

    reason to why the connection is being blocked

Returns:

  • (nil)


195
196
197
198
# File 'lib/amqp/client/connection.rb', line 195

def on_blocked(&blk)
  @on_blocked = blk
  nil
end

#on_unblocked { ... } ⇒ nil

Callback called when client is unblocked by the broker

Yields:

Returns:

  • (nil)


203
204
205
206
# File 'lib/amqp/client/connection.rb', line 203

def on_unblocked(&blk)
  @on_unblocked = blk
  nil
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/amqp/client/connection.rb', line 228

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size ||
      raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
    update_last_activity
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  # Wake channels still blocked in #expect / #wait_for_confirms: an abrupt
  # socket close means no channel/connection close frame ever reached them.
  code, reason = @closed.first(2)
  @channels_lock.synchronize { @channels.values }.each { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  begin
    if @write_lock.owned? # if connection is blocked
      @socket.close
    else
      @write_lock.synchronize do
        @socket.close
      end
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#thread_name(role:, detail: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build a thread name for a role attached to this connection. Format: “amqp.<role> <host>:<port>[ <detail>]” — the ‘[<name>]` segment is only present when the connection has a `name:`.



79
80
81
82
83
# File 'lib/amqp/client/connection.rb', line 79

def thread_name(role:, detail: nil)
  suffix = @name ? "[#{@name}]" : ""
  base = "amqp.#{role}#{suffix} #{@host}:#{@port}"
  detail ? "#{base} #{detail}" : base
end

#update_secret(secret, reason:) ⇒ nil

Update authentication secret, for example when an OAuth backend is used

Parameters:

  • secret (String)

    The new secret

  • reason (String)

    A reason to update it

Returns:

  • (nil)


178
179
180
181
182
# File 'lib/amqp/client/connection.rb', line 178

def update_secret(secret, reason:)
  write_bytes FrameBytes.update_secret(secret, reason)
  expect(:update_secret_ok)
  nil
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



147
148
149
150
151
152
153
154
# File 'lib/amqp/client/connection.rb', line 147

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end

#write_bytes(*bytes) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Write byte array(s) directly to the socket (thread-safe)

Parameters:

  • bytes (String)

    One or more byte arrays

Returns:

  • (Integer)

    number of bytes written



214
215
216
217
218
219
220
221
222
223
# File 'lib/amqp/client/connection.rb', line 214

def write_bytes(*bytes)
  @write_lock.synchronize do
    @socket.write(*bytes)
    update_last_activity
  end
rescue *READ_EXCEPTIONS => e
  raise Error::ConnectionClosed.new(*@closed) if @closed

  raise Error, "Could not write to socket, #{e.message}"
end