Class: AMQP::Client
- Inherits:
-
Object
- Object
- AMQP::Client
- Defined in:
- lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/consumer.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/rpc_client.rb,
lib/amqp/client/frame_bytes.rb,
lib/amqp/client/configuration.rb,
lib/amqp/client/message_codecs.rb,
lib/amqp/client/message_codec_registry.rb
Overview
AMQP 0-9-1 Client
Defined Under Namespace
Modules: Coders, FrameBytes, Parsers, Table Classes: Configuration, Connection, Consumer, Error, Exchange, Message, MessageCodecRegistry, Properties, Queue, RPCClient, ReturnMessage
Constant Summary collapse
- VERSION =
Version of the client library
"2.1.0"
RPC collapse
-
.codec_registry ⇒ MessageCodecRegistry
readonly
Get the class-level codec registry.
-
.config ⇒ Configuration
readonly
Get the class-level configuration.
-
#codec_registry ⇒ MessageCodecRegistry
readonly
Get the codec registry for this instance.
-
#default_content_encoding ⇒ String?
Get/set the default content_encoding to use when publishing messages.
-
#default_content_type ⇒ String?
Get/set the default content_type to use when publishing messages.
-
#strict_coding ⇒ Object
Get/set if condig should be strict, i.e.
Connect and disconnect collapse
-
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection.
-
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first.
-
#started? ⇒ Boolean
Check if the client is connected.
-
#stop ⇒ nil
Close the currently open connection and stop the supervision / reconnection logic.
High level objects collapse
-
#default_exchange ⇒ Exchange
(also: #default)
Return a high level Exchange object for the default direct exchange.
-
#direct_exchange(name = "amq.direct") ⇒ Exchange
(also: #direct)
Declare a direct exchange and return a high level Exchange object.
-
#exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object.
-
#fanout_exchange(name = "amq.fanout") ⇒ Exchange
(also: #fanout)
Declare a fanout exchange and return a high level Exchange object.
-
#headers_exchange(name = "amq.headers") ⇒ Exchange
(also: #headers)
Declare a headers exchange and return a high level Exchange object.
-
#queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) ⇒ Queue
Declare a queue.
-
#topic_exchange(name = "amq.topic") ⇒ Exchange
(also: #topic)
Declare a topic exchange and return a high level Exchange object.
Publish collapse
-
#publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message and wait for confirmation.
-
#publish_and_forget(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation.
-
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes.
Queue actions collapse
-
#bind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue.
-
#get(queue, no_ack: false) ⇒ Message?
Get a message from a queue.
-
#purge(queue) ⇒ nil
Purge a queue.
-
#subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Consume messages from a queue.
-
#unbind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Exchange actions collapse
-
#delete_exchange(name) ⇒ nil
Delete an exchange.
-
#exchange_bind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Bind an exchange to an exchange.
-
#exchange_unbind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Unbind an exchange from an exchange.
RPC collapse
-
.configure {|Configuration| ... } ⇒ Configuration
Configure the AMQP::Client class-level settings.
-
.inherited(subclass) ⇒ Object
private
We need to set the subclass’s configuration and codec registry because these are class instance variables, hence not inherited.
-
#rpc_call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response.
-
#rpc_client ⇒ RPCClient
Create a reusable RPC client.
-
#rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}) {|The| ... } ⇒ Consumer
Create a RPC server for a single method/function/procedure.
Instance Method Summary collapse
- #cancel_consumer(consumer) ⇒ Object private
- #initialize(uri = "", **options) ⇒ Client constructor
- #with_connection ⇒ Object
Constructor Details
#initialize(uri = "", **options) ⇒ Client
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/amqp/client.rb', line 38 def initialize(uri = "", **) @uri = uri @options = @logger = [:logger] @name = parse_name(uri) @queues = {} @exchanges = {} @consumers = {} @next_consumer_id = 0 @connq = SizedQueue.new(1) @codec_registry = self.class.codec_registry.dup @strict_coding = self.class.config.strict_coding @default_content_encoding = self.class.config.default_content_encoding @default_content_type = self.class.config.default_content_type @start_lock = Mutex.new @supervisor_started = false @stopped = false end |
Class Attribute Details
.codec_registry ⇒ MessageCodecRegistry (readonly)
Get the class-level codec registry
521 522 523 |
# File 'lib/amqp/client.rb', line 521 def codec_registry @codec_registry end |
.config ⇒ Configuration (readonly)
Get the class-level configuration
517 518 519 |
# File 'lib/amqp/client.rb', line 517 def config @config end |
Instance Attribute Details
#codec_registry ⇒ MessageCodecRegistry (readonly)
Get the codec registry for this instance
540 541 542 |
# File 'lib/amqp/client.rb', line 540 def codec_registry @codec_registry end |
#default_content_encoding ⇒ String?
Get/set the default content_encoding to use when publishing messages
551 552 553 |
# File 'lib/amqp/client.rb', line 551 def default_content_encoding @default_content_encoding end |
#default_content_type ⇒ String?
Get/set the default content_type to use when publishing messages
547 548 549 |
# File 'lib/amqp/client.rb', line 547 def default_content_type @default_content_type end |
#strict_coding ⇒ Object
Get/set if condig should be strict, i.e. if the client should raise on unknown codecs
543 544 545 |
# File 'lib/amqp/client.rb', line 543 def strict_coding @strict_coding end |
Class Method Details
.configure {|Configuration| ... } ⇒ Configuration
Configure the AMQP::Client class-level settings
510 511 512 513 |
# File 'lib/amqp/client.rb', line 510 def configure yield @config if block_given? @config end |
.inherited(subclass) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
We need to set the subclass’s configuration and codec registry because these are class instance variables, hence not inherited.
526 527 528 529 530 531 532 533 534 535 |
# File 'lib/amqp/client.rb', line 526 def inherited(subclass) super subclass_codec_registry = @codec_registry.dup subclass.instance_variable_set(:@codec_registry, subclass_codec_registry) subclass.instance_variable_set(:@config, Configuration.new(subclass_codec_registry)) # Copy configuration settings from parent subclass.config.strict_coding = @config.strict_coding subclass.config.default_content_type = @config.default_content_type subclass.config.default_content_encoding = @config.default_content_encoding end |
Instance Method Details
#bind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange
360 361 362 363 364 |
# File 'lib/amqp/client.rb', line 360 def bind(queue:, exchange:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange:, binding_key:, arguments:) end end |
#cancel_consumer(consumer) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
571 572 573 574 575 576 577 578 579 580 581 |
# File 'lib/amqp/client.rb', line 571 def cancel_consumer(consumer) @consumers.delete(consumer.id) with_connection do |conn| ch = conn.channel(consumer.channel_id) begin ch.basic_cancel(consumer.tag) ensure ch.close end end end |
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection
64 65 66 67 |
# File 'lib/amqp/client.rb', line 64 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread:, name: @name, codec_registry: @codec_registry, strict_coding: @strict_coding, **@options) end |
#default_exchange ⇒ Exchange Also known as: default
Return a high level Exchange object for the default direct exchange
215 216 217 |
# File 'lib/amqp/client.rb', line 215 def default_exchange(**) direct("", **) end |
#delete_exchange(name) ⇒ nil
Delete an exchange
430 431 432 433 434 435 436 |
# File 'lib/amqp/client.rb', line 430 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end |
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue
392 393 394 395 396 397 398 |
# File 'lib/amqp/client.rb', line 392 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused:, if_empty:) @queues.delete(name) msgs end end |
#direct_exchange(name = "amq.direct") ⇒ Exchange Also known as: direct
Declare a direct exchange and return a high level Exchange object
199 200 201 202 203 204 205 206 |
# File 'lib/amqp/client.rb', line 199 def direct_exchange(name = "amq.direct", **) return exchange(name, type: "direct", **) unless name.empty? # Return the default exchange @exchanges.fetch(name) do @exchanges[name] = Exchange.new(self, name) end end |
#exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object
186 187 188 189 190 191 192 193 |
# File 'lib/amqp/client.rb', line 186 def exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type:, durable:, auto_delete:, internal:, arguments:) end @exchanges[name] = Exchange.new(self, name) end end |
#exchange_bind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Bind an exchange to an exchange
409 410 411 412 413 |
# File 'lib/amqp/client.rb', line 409 def exchange_bind(source:, destination:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination:, source:, binding_key:, arguments:) end end |
#exchange_unbind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Unbind an exchange from an exchange
421 422 423 424 425 |
# File 'lib/amqp/client.rb', line 421 def exchange_unbind(source:, destination:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination:, source:, binding_key:, arguments:) end end |
#fanout_exchange(name = "amq.fanout") ⇒ Exchange Also known as: fanout
Declare a fanout exchange and return a high level Exchange object
227 228 229 |
# File 'lib/amqp/client.rb', line 227 def fanout_exchange(name = "amq.fanout", **) exchange(name, type: "fanout", **) end |
#get(queue, no_ack: false) ⇒ Message?
Get a message from a queue
346 347 348 349 350 351 352 |
# File 'lib/amqp/client.rb', line 346 def get(queue, no_ack: false) with_connection do |conn| conn.with_channel do |ch| ch.basic_get(queue, no_ack:) end end end |
#headers_exchange(name = "amq.headers") ⇒ Exchange Also known as: headers
Declare a headers exchange and return a high level Exchange object
251 252 253 |
# File 'lib/amqp/client.rb', line 251 def headers_exchange(name = "amq.headers", **) exchange(name, type: "headers", **) end |
#publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message and wait for confirmation
273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/amqp/client.rb', line 273 def publish(body, exchange:, routing_key: "", **properties) with_connection do |conn| properties[:delivery_mode] ||= 2 properties = default_content_properties.merge(properties) body = serialize_and_encode_body(body, properties) result = conn.channel(1).basic_publish_confirm(body, exchange:, routing_key:, **properties) raise Error::PublishNotConfirmed unless result nil end end |
#publish_and_forget(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation
292 293 294 295 296 297 298 299 |
# File 'lib/amqp/client.rb', line 292 def publish_and_forget(body, exchange:, routing_key: "", **properties) with_connection do |conn| properties[:delivery_mode] ||= 2 properties = default_content_properties.merge(properties) body = serialize_and_encode_body(body, properties) conn.channel(1).basic_publish(body, exchange:, routing_key:, **properties) end end |
#purge(queue) ⇒ nil
Purge a queue
381 382 383 384 385 |
# File 'lib/amqp/client.rb', line 381 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end |
#queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) ⇒ Queue
Declare a queue
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/amqp/client.rb', line 163 def queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable:, auto_delete:, exclusive:, passive:, arguments:) end @queues[name] = Queue.new(self, name) end end |
#rpc_call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response
476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/amqp/client.rb', line 476 def rpc_call(method, arguments, timeout: nil, **properties) ch = with_connection(&:channel) begin msg = ch.basic_consume_once("amq.rabbitmq.reply-to", timeout:) do properties = default_content_properties.merge(properties) body = serialize_and_encode_body(arguments, properties) ch.basic_publish(body, exchange: "", routing_key: method.to_s, reply_to: "amq.rabbitmq.reply-to", **properties) end msg.parse ensure ch.close end end |
#rpc_client ⇒ RPCClient
Create a reusable RPC client
493 494 495 496 |
# File 'lib/amqp/client.rb', line 493 def rpc_client ch = with_connection(&:channel) RPCClient.new(ch).start end |
#rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}) {|The| ... } ⇒ Consumer
Create a RPC server for a single method/function/procedure
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 |
# File 'lib/amqp/client.rb', line 452 def rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}, &_) queue(method.to_s, durable:, auto_delete:, arguments:) .subscribe(prefetch: worker_threads, worker_threads:) do |msg| result = yield msg.parse properties = { content_type: msg.properties.content_type, content_encoding: msg.properties.content_encoding } result_body = serialize_and_encode_body(result, properties) msg.channel.basic_publish(result_body, exchange: "", routing_key: msg.properties.reply_to, correlation_id: msg.properties.correlation_id, **properties) msg.ack rescue StandardError msg.reject(requeue: false) raise end end |
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/amqp/client.rb', line 75 def start return self if started? @start_lock.synchronize do # rubocop:disable Metrics/BlockLength return self if started? @supervisor_started = true @stopped = false initial_conn = connect(read_loop_thread: false) log_lifecycle(:info, "connected") supervisor = Thread.new(initial_conn) do |conn| # rubocop:disable Metrics/BlockLength Thread.current.abort_on_exception = true # Raising an unhandled exception is a bug loop do # rubocop:disable Metrics/BlockLength break if @stopped unless conn conn = connect(read_loop_thread: false) log_lifecycle(:info, "reconnected") end setup = Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @consumers.each_value do |consumer| ch = conn.channel ch.basic_qos(consumer.prefetch) consume_ok = ch.basic_consume(consumer.queue, **consumer.basic_consume_args, &consumer.block) # Update the consumer with new channel and consume_ok metadata consumer.update_consume_ok(consume_ok) end @connq << conn # Remove consumers whose internal queues were already closed (e.g. cancelled during reconnect window) @consumers.delete_if { |_, c| c.closed? } end setup.name = thread_name("reconnect_setup") conn.read_loop # blocks until connection is closed, then reconnect log_lifecycle(:warn, "disconnected") rescue Error => e log_reconnect_error(e) sleep @options[:reconnect_interval] || 1 ensure @connq.clear conn = nil end end supervisor.name = thread_name("supervisor") end self end |
#started? ⇒ Boolean
Check if the client is connected
142 143 144 |
# File 'lib/amqp/client.rb', line 142 def started? @supervisor_started && !@stopped end |
#stop ⇒ nil
Close the currently open connection and stop the supervision / reconnection logic.
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/amqp/client.rb', line 129 def stop return if @stopped && !@supervisor_started @stopped = true return unless @connq.size.positive? conn = @connq.pop conn.close nil end |
#subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Consume messages from a queue
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/amqp/client.rb', line 321 def subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}, &blk) raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) consumer_id = @next_consumer_id += 1 on_cancel_proc = proc do |tag| @consumers.delete(consumer_id) on_cancel&.call(tag) end basic_consume_args = { exclusive:, no_ack:, worker_threads:, on_cancel: on_cancel_proc, arguments: } consume_ok = ch.basic_consume(queue, **basic_consume_args, &blk) consumer = Consumer.new(client: self, channel_id: ch.id, id: consumer_id, block: blk, queue:, consume_ok:, prefetch:, basic_consume_args:) @consumers[consumer_id] = consumer consumer end end |
#topic_exchange(name = "amq.topic") ⇒ Exchange Also known as: topic
Declare a topic exchange and return a high level Exchange object
239 240 241 |
# File 'lib/amqp/client.rb', line 239 def topic_exchange(name = "amq.topic", **) exchange(name, type: "topic", **) end |
#unbind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange
372 373 374 375 376 |
# File 'lib/amqp/client.rb', line 372 def unbind(queue:, exchange:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange:, binding_key:, arguments:) end end |
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes
303 304 305 306 307 |
# File 'lib/amqp/client.rb', line 303 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end |
#with_connection ⇒ Object
555 556 557 558 559 560 561 562 563 564 565 566 567 568 |
# File 'lib/amqp/client.rb', line 555 def with_connection conn = nil loop do conn = @connq.pop next if conn.closed? break end begin yield conn ensure @connq << conn unless conn.closed? end end |