Class: AMQP::Client::Connection
- Inherits:
-
Object
- Object
- AMQP::Client::Connection
- Defined in:
- lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb
Overview
Represents a single established AMQP connection
Defined Under Namespace
Classes: Channel
Instance Attribute Summary collapse
-
#codec_registry ⇒ MessageCodecRegistry?
readonly
The codec registry for message encoding/decoding.
-
#frame_max ⇒ Integer
readonly
The max frame size negotiated between the client and the broker.
-
#strict_coding ⇒ Boolean
readonly
Whether to use strict coding (raise errors on unsupported codecs).
Callbacks collapse
-
#on_blocked {|String| ... } ⇒ nil
Callback called when client is blocked by the broker.
-
#on_unblocked { ... } ⇒ nil
Callback called when client is unblocked by the broker.
Class Method Summary collapse
- .connect(uri, read_loop_thread: true) ⇒ Object deprecated Deprecated.
Instance Method Summary collapse
-
#blocked? ⇒ Bool
Indicates that the server is blocking publishes.
-
#channel(id = nil) ⇒ Channel
Open an AMQP channel.
-
#close(reason: "", code: 200) ⇒ nil
Gracefully close a connection.
-
#closed? ⇒ Boolean
True if the connection is closed.
-
#initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, name: nil, **options) ⇒ Connection
constructor
Establish a connection to an AMQP broker.
-
#inspect ⇒ String
private
Custom inspect.
-
#read_loop ⇒ nil
Reads from the socket, required for any kind of progress.
-
#thread_name(role:, detail: nil) ⇒ Object
private
Build a thread name for a role attached to this connection.
-
#update_secret(secret, reason:) ⇒ nil
Update authentication secret, for example when an OAuth backend is used.
-
#with_channel {|Channel| ... } ⇒ Object
Declare a new channel, yield, and then close the channel.
-
#write_bytes(*bytes) ⇒ Integer
private
Write byte array(s) directly to the socket (thread-safe).
Constructor Details
#initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, name: nil, **options) ⇒ Connection
Establish a connection to an AMQP broker
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/amqp/client/connection.rb', line 34 def initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, name: nil, **) uri = URI.parse(uri) tls = uri.scheme == "amqps" port = port_from_env || uri.port || (tls ? 5671 : 5672) host = uri.host || "localhost" user = uri.user || "guest" password = uri.password || "guest" vhost = URI.decode_www_form_component(uri.path[1..] || "/") = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge() @host = host @port = port @name = name socket = open_socket(host, port, tls, ) channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, ) @socket = socket @channel_max = channel_max.zero? ? 65_536 : channel_max @frame_max = frame_max @heartbeat = heartbeat @codec_registry = codec_registry @strict_coding = strict_coding @channels = {} @channels_lock = Mutex.new @closed = nil @replies = ::Queue.new @write_lock = Mutex.new @blocked = nil @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" } @on_unblocked = -> { warn "AMQP-Client unblocked by broker" } # Only used with heartbeats @last_activity_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) return unless read_loop_thread t = Thread.new { read_loop } t.name = thread_name(role: "read_loop") end |
Instance Attribute Details
#codec_registry ⇒ MessageCodecRegistry? (readonly)
The codec registry for message encoding/decoding
108 109 110 |
# File 'lib/amqp/client/connection.rb', line 108 def codec_registry @codec_registry end |
#frame_max ⇒ Integer (readonly)
The max frame size negotiated between the client and the broker
104 105 106 |
# File 'lib/amqp/client/connection.rb', line 104 def frame_max @frame_max end |
#strict_coding ⇒ Boolean (readonly)
Whether to use strict coding (raise errors on unsupported codecs)
112 113 114 |
# File 'lib/amqp/client/connection.rb', line 112 def strict_coding @strict_coding end |
Class Method Details
.connect(uri, read_loop_thread: true) ⇒ Object
Alias for #initialize
98 99 100 |
# File 'lib/amqp/client/connection.rb', line 98 def self.connect(uri, read_loop_thread: true, **) new(uri, read_loop_thread:, **) end |
Instance Method Details
#blocked? ⇒ Bool
Indicates that the server is blocking publishes. If the client keeps publishing the server will stop reading from the socket. Use the #on_blocked callback to get notified when the server is resource constrained.
91 92 93 |
# File 'lib/amqp/client/connection.rb', line 91 def blocked? !@blocked.nil? end |
#channel(id = nil) ⇒ Channel
Open an AMQP channel
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/amqp/client/connection.rb', line 124 def channel(id = nil) raise ArgumentError, "Channel ID cannot be 0" if id&.zero? raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max ch = @channels_lock.synchronize do if id @channels[id] ||= Channel.new(self, id) else 1.upto(@channel_max) do |i| break id = i unless @channels.key? i end raise Error, "Max channels reached" if id.nil? @channels[id] = Channel.new(self, id) end end ch.open end |
#close(reason: "", code: 200) ⇒ nil
Gracefully close a connection
160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/amqp/client/connection.rb', line 160 def close(reason: "", code: 200) return if @closed @closed = [code, reason] @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) } if @blocked @socket.close else write_bytes FrameBytes.connection_close(code, reason) expect(:close_ok) end nil end |
#closed? ⇒ Boolean
True if the connection is closed
186 187 188 |
# File 'lib/amqp/client/connection.rb', line 186 def closed? !@closed.nil? end |
#inspect ⇒ String
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Custom inspect
117 118 119 |
# File 'lib/amqp/client/connection.rb', line 117 def inspect "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>" end |
#on_blocked {|String| ... } ⇒ nil
Callback called when client is blocked by the broker
195 196 197 198 |
# File 'lib/amqp/client/connection.rb', line 195 def on_blocked(&blk) @on_blocked = blk nil end |
#on_unblocked { ... } ⇒ nil
Callback called when client is unblocked by the broker
203 204 205 206 |
# File 'lib/amqp/client/connection.rb', line 203 def on_unblocked(&blk) @on_unblocked = blk nil end |
#read_loop ⇒ nil
Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/amqp/client/connection.rb', line 228 def read_loop # read more often than write so that channel errors crop up early Thread.current.priority += 1 socket = @socket frame_max = @frame_max frame_start = String.new(capacity: 7) frame_buffer = String.new(capacity: frame_max) loop do socket.read(7, frame_start) || raise(IOError) type, channel_id, frame_size = frame_start.unpack("C S> L>") frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}") # read the frame content socket.read(frame_size, frame_buffer) || raise(IOError) # make sure that the frame end is correct frame_end = socket.readchar.ord raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206 # parse the frame, will return false if a close frame was received parse_frame(type, channel_id, frame_buffer) || return update_last_activity end nil rescue *READ_EXCEPTIONS => e @closed ||= [400, "read error: #{e.}"] nil # ignore read errors ensure @closed ||= [400, "unknown"] @replies.close # Wake channels still blocked in #expect / #wait_for_confirms: an abrupt # socket close means no channel/connection close frame ever reached them. code, reason = @closed.first(2) @channels_lock.synchronize { @channels.values }.each { |ch| ch.closed!(:connection, code, reason, 0, 0) } begin if @write_lock.owned? # if connection is blocked @socket.close else @write_lock.synchronize do @socket.close end end rescue *READ_EXCEPTIONS nil end end |
#thread_name(role:, detail: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Build a thread name for a role attached to this connection. Format: “amqp.<role> <host>:<port>[ <detail>]” — the ‘[<name>]` segment is only present when the connection has a `name:`.
79 80 81 82 83 |
# File 'lib/amqp/client/connection.rb', line 79 def thread_name(role:, detail: nil) suffix = @name ? "[#{@name}]" : "" base = "amqp.#{role}#{suffix} #{@host}:#{@port}" detail ? "#{base} #{detail}" : base end |
#update_secret(secret, reason:) ⇒ nil
Update authentication secret, for example when an OAuth backend is used
178 179 180 181 182 |
# File 'lib/amqp/client/connection.rb', line 178 def update_secret(secret, reason:) write_bytes FrameBytes.update_secret(secret, reason) expect(:update_secret_ok) nil end |
#with_channel {|Channel| ... } ⇒ Object
Declare a new channel, yield, and then close the channel
147 148 149 150 151 152 153 154 |
# File 'lib/amqp/client/connection.rb', line 147 def with_channel ch = channel begin yield ch ensure ch.close end end |
#write_bytes(*bytes) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Write byte array(s) directly to the socket (thread-safe)
214 215 216 217 218 219 220 221 222 223 |
# File 'lib/amqp/client/connection.rb', line 214 def write_bytes(*bytes) @write_lock.synchronize do @socket.write(*bytes) update_last_activity end rescue *READ_EXCEPTIONS => e raise Error::ConnectionClosed.new(*@closed) if @closed raise Error, "Could not write to socket, #{e.}" end |