Class: AMQP::Client::Connection::Channel
- Inherits:
-
Object
- Object
- AMQP::Client::Connection::Channel
- Defined in:
- lib/amqp/client/channel.rb
Overview
AMQP Channel
Defined Under Namespace
Queue collapse
-
#consumer_count ⇒ Integer
Number of consumers subscribed to the queue at the time of declaration.
-
#message_count ⇒ Integer
Number of messages in the queue at the time of declaration.
-
#queue_name ⇒ String
The name of the queue.
Basic collapse
-
#channel_id ⇒ Integer
The channel ID.
-
#consumer_tag ⇒ String
The consumer tag.
-
#worker_threads ⇒ Array<Thread>
Array of worker threads.
Instance Attribute Summary collapse
-
#connection ⇒ Connection
readonly
Connection this channel belongs to.
-
#id ⇒ Integer
readonly
Channel ID.
Exchange collapse
-
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange.
-
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange.
-
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange.
-
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange.
Queue collapse
-
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent).
-
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue.
-
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue.
-
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Basic collapse
-
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message.
-
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer.
-
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue.
-
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue.
-
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling).
-
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message.
-
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange.
-
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it.
-
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`.
-
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages.
-
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message.
Confirm collapse
-
#confirm(args) ⇒ Object
private
Called by Connection when received ack/nack from broker.
-
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker.
-
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed.
Transaction collapse
-
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode.
-
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode.
-
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish.
Instance Method Summary collapse
- #basic_get_empty ⇒ Object private
- #body_delivered(body_part) ⇒ Object private
-
#cancel_consumer(tag) ⇒ Object
private
Handle consumer cancellation from the broker.
-
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel.
-
#closed!(level, code, reason, classid, methodid) ⇒ nil
private
Called when channel is closed by broker.
- #header_delivered(body_size, properties) ⇒ Object private
-
#initialize(connection, id) ⇒ Channel
constructor
private
Should only be called from Connection.
-
#inspect ⇒ Object
private
Override #inspect.
- #message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object private
- #message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object private
-
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block.
-
#open ⇒ Channel
private
Open the channel (called from Connection).
- #reply(args) ⇒ Object private
Constructor Details
#initialize(connection, id) ⇒ Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Should only be called from Connection
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/amqp/client/channel.rb', line 16 def initialize(connection, id) @connection = connection @id = id @replies = ::Queue.new @consumers = {} @closed = nil @open = false @on_return = nil @confirm = nil @unconfirmed = [] @unconfirmed_lock = Mutex.new @unconfirmed_empty = ConditionVariable.new @nacked = false @basic_gets = ::Queue.new end |
Instance Attribute Details
#channel_id ⇒ Integer
Returns The channel ID.
324 |
# File 'lib/amqp/client/channel.rb', line 324 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#connection ⇒ Connection (readonly)
Connection this channel belongs to
45 46 47 |
# File 'lib/amqp/client/channel.rb', line 45 def connection @connection end |
#consumer_count ⇒ Integer
Returns Number of consumers subscribed to the queue at the time of declaration.
165 |
# File 'lib/amqp/client/channel.rb', line 165 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#consumer_tag ⇒ String
Returns The consumer tag.
324 |
# File 'lib/amqp/client/channel.rb', line 324 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#id ⇒ Integer (readonly)
Channel ID
41 42 43 |
# File 'lib/amqp/client/channel.rb', line 41 def id @id end |
#message_count ⇒ Integer
Returns Number of messages in the queue at the time of declaration.
165 |
# File 'lib/amqp/client/channel.rb', line 165 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#queue_name ⇒ String
Returns The name of the queue.
165 |
# File 'lib/amqp/client/channel.rb', line 165 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#worker_threads ⇒ Array<Thread>
Returns Array of worker threads.
324 |
# File 'lib/amqp/client/channel.rb', line 324 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
Instance Method Details
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message
415 416 417 418 |
# File 'lib/amqp/client/channel.rb', line 415 def basic_ack(delivery_tag, multiple: false) write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) nil end |
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer
389 390 391 392 393 394 395 396 397 398 |
# File 'lib/amqp/client/channel.rb', line 389 def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers[consumer_tag] return unless consumer write_bytes FrameBytes.basic_cancel(@id, consumer_tag) expect(:basic_cancel_ok) unless no_wait @consumers.delete(consumer_tag) close_consumer(consumer) nil end |
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/amqp/client/channel.rb', line 340 def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil, &blk) raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty? write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments) consumer_tag, = expect(:basic_consume_ok) unless no_wait msg_q = ::Queue.new if worker_threads.zero? @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: [], msg_q:, on_cancel:) consume_loop(msg_q, consumer_tag, &blk) nil else threads = Array.new(worker_threads) do |i| t = Thread.new { consume_loop(msg_q, consumer_tag, &blk) } t.name = @connection.thread_name(role: "consumer", detail: "ch=#{@id} tag=#{consumer_tag} ##{i + 1}") t end @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: threads, msg_q:, on_cancel:) end end |
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/amqp/client/channel.rb', line 369 def basic_consume_once(queue, timeout: nil, &) tag = "consume-once-#{rand(1024)}" write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil) msg_q = ::Queue.new @consumers[tag] = ConsumeOk.new(channel_id: @id, consumer_tag: tag, worker_threads: [], msg_q:, on_cancel: nil) yield if block_given? msg = msg_q.pop(timeout:) write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true) consumer = @consumers.delete(tag) close_consumer(consumer) raise Timeout::Error, "No message received in #{timeout} seconds" if timeout && msg.nil? msg end |
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling)
244 245 246 247 248 249 250 251 |
# File 'lib/amqp/client/channel.rb', line 244 def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil when nil then raise Error::Closed.new(@id, *@closed) end end |
#basic_get_empty ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
556 557 558 |
# File 'lib/amqp/client/channel.rb', line 556 def basic_get_empty @basic_gets.push :basic_get_empty end |
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message
425 426 427 428 |
# File 'lib/amqp/client/channel.rb', line 425 def basic_nack(delivery_tag, multiple: false, requeue: false) write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) nil end |
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/amqp/client/channel.rb', line 274 def basic_publish(body, exchange:, routing_key: "", **properties) body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end if @confirm @unconfirmed_lock.synchronize do @unconfirmed.push @confirm += 1 end end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end nil end |
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it
311 312 313 314 315 |
# File 'lib/amqp/client/channel.rb', line 311 def basic_publish_confirm(body, exchange:, routing_key: "", **properties) confirm_select(no_wait: true) basic_publish(body, exchange:, routing_key:, **properties) wait_for_confirms end |
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`
405 406 407 408 409 |
# File 'lib/amqp/client/channel.rb', line 405 def basic_qos(prefetch_count, prefetch_size: 0, global: false) write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) expect :basic_qos_ok nil end |
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages
443 444 445 446 447 |
# File 'lib/amqp/client/channel.rb', line 443 def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue:) expect :basic_recover_ok nil end |
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message
434 435 436 437 |
# File 'lib/amqp/client/channel.rb', line 434 def basic_reject(delivery_tag, requeue: false) write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) nil end |
#body_delivered(body_part) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
572 573 574 575 576 577 578 |
# File 'lib/amqp/client/channel.rb', line 572 def body_delivered(body_part) @next_body.write(body_part) return unless @next_body.pos == @next_body_size @next_msg.body = @next_body.string end |
#cancel_consumer(tag) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle consumer cancellation from the broker
582 583 584 585 586 587 588 589 590 591 592 593 |
# File 'lib/amqp/client/channel.rb', line 582 def cancel_consumer(tag) consumer = @consumers.delete(tag) return unless consumer close_consumer(consumer) begin consumer.on_cancel&.call(consumer.consumer_tag) rescue StandardError => e warn "AMQP-Client consumer on_cancel callback error: #{e.class}: #{e.}" end nil end |
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/amqp/client/channel.rb', line 63 def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value { |c| close_consumer(c) } nil end |
#closed!(level, code, reason, classid, methodid) ⇒ nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called when channel is closed by broker
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/amqp/client/channel.rb', line 80 def closed!(level, code, reason, classid, methodid) return if @closed @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value do |c| close_consumer(c) c.msg_q.clear # empty the queues too, messages can't be acked anymore end nil end |
#confirm(args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by Connection when received ack/nack from broker
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 |
# File 'lib/amqp/client/channel.rb', line 489 def confirm(args) ack_or_nack, delivery_tag, multiple = *args @unconfirmed_lock.synchronize do # A tag we're not tracking (a duplicate, out-of-order, or broker # quirk) is logged and ignored, not raised: #confirm runs on the # read_loop thread, where an exception would tear down the connection. confirmed = if multiple idx = @unconfirmed.index(delivery_tag) @unconfirmed.shift(idx + 1) if idx else @unconfirmed.delete(delivery_tag) end if confirmed @nacked = true if ack_or_nack == :nack @unconfirmed_empty.broadcast if @unconfirmed.empty? else warn "AMQP-Client received #{ack_or_nack} for unknown delivery tag #{delivery_tag} on channel #{@id}" end end end |
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker
455 456 457 458 459 460 461 462 463 464 465 466 467 |
# File 'lib/amqp/client/channel.rb', line 455 def confirm_select(no_wait: false) return if @confirm # fast path @unconfirmed_lock.synchronize do # check again in case another thread already did this while we waited for the lock return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait @confirm = 0 end nil end |
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange
137 138 139 140 141 |
# File 'lib/amqp/client/channel.rb', line 137 def exchange_bind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok nil end |
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange
114 115 116 117 118 |
# File 'lib/amqp/client/channel.rb', line 114 def exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok nil end |
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange
125 126 127 128 129 |
# File 'lib/amqp/client/channel.rb', line 125 def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil end |
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange
149 150 151 152 153 |
# File 'lib/amqp/client/channel.rb', line 149 def exchange_unbind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok nil end |
#header_delivered(body_size, properties) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
561 562 563 564 565 566 567 568 569 |
# File 'lib/amqp/client/channel.rb', line 561 def header_delivered(body_size, properties) @next_msg.properties = properties if body_size.zero? else @next_body = StringIO.new(String.new(capacity: body_size)) @next_body_size = body_size end end |
#inspect ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Override #inspect
34 35 36 37 |
# File 'lib/amqp/client/channel.rb', line 34 def inspect "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?} " \ "consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>" end |
#message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
551 552 553 |
# File 'lib/amqp/client/channel.rb', line 551 def (consumer_tag, delivery_tag, redelivered, exchange, routing_key) @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered) end |
#message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
546 547 548 |
# File 'lib/amqp/client/channel.rb', line 546 def (reply_code, reply_text, exchange, routing_key) @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key) end |
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block. If not set the message will just be logged to STDERR
97 98 99 100 |
# File 'lib/amqp/client/channel.rb', line 97 def on_return(&block) @on_return = block nil end |
#open ⇒ Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Open the channel (called from Connection)
50 51 52 53 54 55 56 57 |
# File 'lib/amqp/client/channel.rb', line 50 def open return self if @open @open = true write_bytes FrameBytes.channel_open(@id) expect(:channel_open_ok) self end |
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange
207 208 209 210 211 |
# File 'lib/amqp/client/channel.rb', line 207 def queue_bind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok nil end |
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent)
177 178 179 180 181 182 183 184 185 186 |
# File 'lib/amqp/client/channel.rb', line 177 def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, , consumer_count = expect(:queue_declare_ok) QueueOk.new(name, , consumer_count) end |
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue
195 196 197 198 199 |
# File 'lib/amqp/client/channel.rb', line 195 def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) , = expect :queue_delete unless no_wait end |
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue
218 219 220 221 222 |
# File 'lib/amqp/client/channel.rb', line 218 def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) , = expect :queue_purge_ok unless no_wait end |
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange
230 231 232 233 234 |
# File 'lib/amqp/client/channel.rb', line 230 def queue_unbind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok nil end |
#reply(args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
541 542 543 |
# File 'lib/amqp/client/channel.rb', line 541 def reply(args) @replies.push(args) end |
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode
524 525 526 527 528 |
# File 'lib/amqp/client/channel.rb', line 524 def tx_commit write_bytes FrameBytes.tx_commit(@id) expect :tx_commit_ok nil end |
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode
532 533 534 535 536 |
# File 'lib/amqp/client/channel.rb', line 532 def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok nil end |
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish
516 517 518 519 520 |
# File 'lib/amqp/client/channel.rb', line 516 def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok nil end |
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/amqp/client/channel.rb', line 471 def wait_for_confirms @unconfirmed_lock.synchronize do until @unconfirmed.empty? # Check before waiting: if the channel was closed (and the # @unconfirmed_empty broadcast from #closed! fired) before we got # here, the wakeup is already gone and #wait would block forever. raise Error::Closed.new(@id, *@closed) if @closed @unconfirmed_empty.wait(@unconfirmed_lock) end result = !@nacked @nacked = false # Reset for next round of publishes result end end |