Class: AMQP::Client::Connection::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/channel.rb

Overview

AMQP Channel

Defined Under Namespace

Classes: ConsumeOk, QueueOk

Queue collapse

Basic collapse

Instance Attribute Summary collapse

Exchange collapse

Queue collapse

Basic collapse

Confirm collapse

Transaction collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, id) ⇒ Channel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Should only be called from Connection

Parameters:

  • connection (Connection)

    The connection this channel belongs to

  • id (Integer)

    ID of the channel

See Also:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/amqp/client/channel.rb', line 16

def initialize(connection, id)
  @connection = connection
  @id = id
  @replies = ::Queue.new
  @consumers = {}
  @closed = nil
  @open = false
  @on_return = nil
  @confirm = nil
  @unconfirmed = []
  @unconfirmed_lock = Mutex.new
  @unconfirmed_empty = ConditionVariable.new
  @nacked = false
  @basic_gets = ::Queue.new
end

Instance Attribute Details

#channel_idInteger

Returns The channel ID.

Returns:

  • (Integer)

    The channel ID



324
# File 'lib/amqp/client/channel.rb', line 324

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

#connectionConnection (readonly)

Connection this channel belongs to

Returns:



45
46
47
# File 'lib/amqp/client/channel.rb', line 45

def connection
  @connection
end

#consumer_countInteger

Returns Number of consumers subscribed to the queue at the time of declaration.

Returns:

  • (Integer)

    Number of consumers subscribed to the queue at the time of declaration



165
# File 'lib/amqp/client/channel.rb', line 165

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#consumer_tagString

Returns The consumer tag.

Returns:

  • (String)

    The consumer tag



324
# File 'lib/amqp/client/channel.rb', line 324

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

#idInteger (readonly)

Channel ID

Returns:

  • (Integer)


41
42
43
# File 'lib/amqp/client/channel.rb', line 41

def id
  @id
end

#message_countInteger

Returns Number of messages in the queue at the time of declaration.

Returns:

  • (Integer)

    Number of messages in the queue at the time of declaration



165
# File 'lib/amqp/client/channel.rb', line 165

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#queue_nameString

Returns The name of the queue.

Returns:

  • (String)

    The name of the queue



165
# File 'lib/amqp/client/channel.rb', line 165

QueueOk = Data.define(:queue_name, :message_count, :consumer_count)

#worker_threadsArray<Thread>

Returns Array of worker threads.

Returns:

  • (Array<Thread>)

    Array of worker threads



324
# File 'lib/amqp/client/channel.rb', line 324

ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)

Instance Method Details

#basic_ack(delivery_tag, multiple: false) ⇒ nil

Acknowledge a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

  • multiple (Boolean) (defaults to: false)

    Ack all messages up to this message

Returns:

  • (nil)


415
416
417
418
# File 'lib/amqp/client/channel.rb', line 415

def basic_ack(delivery_tag, multiple: false)
  write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
  nil
end

#basic_cancel(consumer_tag, no_wait: false) ⇒ nil

Cancel/abort/stop a consumer

Parameters:

  • consumer_tag (String)

    Tag of the consumer to cancel

  • no_wait (Boolean) (defaults to: false)

    Will wait for a confirmation from the broker that the consumer is cancelled

Returns:

  • (nil)


389
390
391
392
393
394
395
396
397
398
# File 'lib/amqp/client/channel.rb', line 389

def basic_cancel(consumer_tag, no_wait: false)
  consumer = @consumers[consumer_tag]
  return unless consumer

  write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
  expect(:basic_cancel_ok) unless no_wait
  @consumers.delete(consumer_tag)
  close_consumer(consumer)
  nil
end

#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?

Consume messages from a queue

Parameters:

  • queue (String)

    Name of the queue to subscribe to

  • tag (String) (defaults to: "")

    Custom consumer tag, will be auto assigned by the broker if empty. Has to be unique among this channel’s consumers only

  • no_ack (Boolean) (defaults to: true)

    When false messages have to be manually acknowledged (or rejected)

  • exclusive (Boolean) (defaults to: false)

    When true only a single consumer can consume from the queue at a time

  • arguments (Hash) (defaults to: {})

    Custom arguments for the consumer

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will process the messages and thus this method will block

  • on_cancel (Proc) (defaults to: nil)

    Optional proc that will be called if the consumer is cancelled by the broker The proc will be called with the consumer tag as the only argument

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (ConsumeOk)
  • (nil)

    When ‘worker_threads` is 0 the method will return when the consumer is cancelled

Raises:

  • (ArgumentError)


340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/amqp/client/channel.rb', line 340

def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false,
                  arguments: {}, worker_threads: 1, on_cancel: nil, &blk)
  raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty?

  write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments)
  consumer_tag, = expect(:basic_consume_ok) unless no_wait
  msg_q = ::Queue.new
  if worker_threads.zero?
    @consumers[consumer_tag] =
      ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: [], msg_q:, on_cancel:)
    consume_loop(msg_q, consumer_tag, &blk)
    nil
  else
    threads = Array.new(worker_threads) do |i|
      t = Thread.new { consume_loop(msg_q, consumer_tag, &blk) }
      t.name = @connection.thread_name(role: "consumer", detail: "ch=#{@id} tag=#{consumer_tag} ##{i + 1}")
      t
    end
    @consumers[consumer_tag] =
      ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: threads, msg_q:, on_cancel:)
  end
end

#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message

Consume a single message from a queue

Parameters:

  • queue (String)

    Name of the queue to subscribe to

  • timeout (Numeric, nil) (defaults to: nil)

    Number of seconds to wait for a message

Yields:

  • Block in which the message will be yielded

Returns:

  • (Message)

    The single message received from the queue

Raises:

  • (Timeout::Error)

    if no response is received within the timeout period



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/amqp/client/channel.rb', line 369

def basic_consume_once(queue, timeout: nil, &)
  tag = "consume-once-#{rand(1024)}"
  write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil)
  msg_q = ::Queue.new
  @consumers[tag] =
    ConsumeOk.new(channel_id: @id, consumer_tag: tag, worker_threads: [], msg_q:, on_cancel: nil)
  yield if block_given?
  msg = msg_q.pop(timeout:)
  write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true)
  consumer = @consumers.delete(tag)
  close_consumer(consumer)
  raise Timeout::Error, "No message received in #{timeout} seconds" if timeout && msg.nil?

  msg
end

#basic_get(queue_name, no_ack: true) ⇒ Message?

Get a message from a queue (by polling)

Parameters:

  • queue_name (String)
  • no_ack (Boolean) (defaults to: true)

    When false the message have to be manually acknowledged

Returns:

  • (Message)

    If the queue had a message

  • (nil)

    If the queue doesn’t have any messages



244
245
246
247
248
249
250
251
# File 'lib/amqp/client/channel.rb', line 244

def basic_get(queue_name, no_ack: true)
  write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
  case (msg = @basic_gets.pop)
  when Message then msg
  when :basic_get_empty then nil
  when nil              then raise Error::Closed.new(@id, *@closed)
  end
end

#basic_get_emptyObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



556
557
558
# File 'lib/amqp/client/channel.rb', line 556

def basic_get_empty
  @basic_gets.push :basic_get_empty
end

#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil

Negatively acknowledge a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

  • multiple (Boolean) (defaults to: false)

    Nack all messages up to this message

  • requeue (Boolean) (defaults to: false)

    Requeue the message

Returns:

  • (nil)


425
426
427
428
# File 'lib/amqp/client/channel.rb', line 425

def basic_nack(delivery_tag, multiple: false, requeue: false)
  write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
  nil
end

#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil

Publishes a message to an exchange

Parameters:

  • body (String)

    The body

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String) (defaults to: "")

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (nil)


274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/amqp/client/channel.rb', line 274

def basic_publish(body, exchange:, routing_key: "", **properties)
  body_max = @connection.frame_max - 8
  id = @id
  mandatory = properties.delete(:mandatory) || false
  case properties.delete(:persistent)
  when true then properties[:delivery_mode] = 2
  when false then properties[:delivery_mode] = 1
  end
  if @confirm
    @unconfirmed_lock.synchronize do
      @unconfirmed.push @confirm += 1
    end
  end
  if body.bytesize.between?(1, body_max)
    write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
                FrameBytes.header(id, body.bytesize, properties),
                FrameBytes.body(id, body)
    return
  end

  write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
              FrameBytes.header(id, body.bytesize, properties)
  pos = 0
  while pos < body.bytesize # split body into multiple frame_max frames
    len = [body_max, body.bytesize - pos].min
    body_part = body.byteslice(pos, len)
    write_bytes FrameBytes.body(id, body_part)
    pos += len
  end
  nil
end

#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean

Publish a message and block until the message has confirmed it has received it

Parameters:

  • body (String)

    The body

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String) (defaults to: "")

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (Boolean)

    True if the message was successfully published



311
312
313
314
315
# File 'lib/amqp/client/channel.rb', line 311

def basic_publish_confirm(body, exchange:, routing_key: "", **properties)
  confirm_select(no_wait: true)
  basic_publish(body, exchange:, routing_key:, **properties)
  wait_for_confirms
end

#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil

Specify how many messages to prefetch for consumers with ‘no_ack: false`

Parameters:

  • prefetch_count (Integer)

    Number of messages to maximum keep in flight

  • prefetch_size (Integer) (defaults to: 0)

    Number of bytes to maximum keep in flight

  • global (Boolean) (defaults to: false)

    If true the limit will apply to channel rather than the consumer

Returns:

  • (nil)


405
406
407
408
409
# File 'lib/amqp/client/channel.rb', line 405

def basic_qos(prefetch_count, prefetch_size: 0, global: false)
  write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
  expect :basic_qos_ok
  nil
end

#basic_recover(requeue: false) ⇒ nil

Recover all the unacknowledge messages

Parameters:

  • requeue (Boolean) (defaults to: false)

    If false the currently unack:ed messages will be deliviered to this consumer again, if true to any consumer

Returns:

  • (nil)


443
444
445
446
447
# File 'lib/amqp/client/channel.rb', line 443

def basic_recover(requeue: false)
  write_bytes FrameBytes.basic_recover(@id, requeue:)
  expect :basic_recover_ok
  nil
end

#basic_reject(delivery_tag, requeue: false) ⇒ nil

Reject a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

  • requeue (Boolean) (defaults to: false)

    Requeue the message into the queue again

Returns:

  • (nil)


434
435
436
437
# File 'lib/amqp/client/channel.rb', line 434

def basic_reject(delivery_tag, requeue: false)
  write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
  nil
end

#body_delivered(body_part) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



572
573
574
575
576
577
578
# File 'lib/amqp/client/channel.rb', line 572

def body_delivered(body_part)
  @next_body.write(body_part)
  return unless @next_body.pos == @next_body_size

  @next_msg.body = @next_body.string
  next_message_finished!
end

#cancel_consumer(tag) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle consumer cancellation from the broker



582
583
584
585
586
587
588
589
590
591
592
593
# File 'lib/amqp/client/channel.rb', line 582

def cancel_consumer(tag)
  consumer = @consumers.delete(tag)
  return unless consumer

  close_consumer(consumer)
  begin
    consumer.on_cancel&.call(consumer.consumer_tag)
  rescue StandardError => e
    warn "AMQP-Client consumer on_cancel callback error: #{e.class}: #{e.message}"
  end
  nil
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close channel

Parameters:

  • reason (String) (defaults to: "")

    The reason for closing the channel

  • code (Integer) (defaults to: 200)

    The close code

Returns:

  • (nil)


63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/amqp/client/channel.rb', line 63

def close(reason: "", code: 200)
  return if @closed

  write_bytes FrameBytes.channel_close(@id, reason, code)
  @closed = [:channel, code, reason]
  expect :channel_close_ok
  @replies.close
  @basic_gets.close
  @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
  @consumers.each_value { |c| close_consumer(c) }
  nil
end

#closed!(level, code, reason, classid, methodid) ⇒ nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called when channel is closed by broker

Parameters:

  • level (Symbol)

    :connection or :channel

Returns:

  • (nil)


80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/amqp/client/channel.rb', line 80

def closed!(level, code, reason, classid, methodid)
  return if @closed

  @closed = [level, code, reason, classid, methodid]
  @replies.close
  @basic_gets.close
  @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
  @consumers.each_value do |c|
    close_consumer(c)
    c.msg_q.clear # empty the queues too, messages can't be acked anymore
  end
  nil
end

#confirm(args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by Connection when received ack/nack from broker



489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
# File 'lib/amqp/client/channel.rb', line 489

def confirm(args)
  ack_or_nack, delivery_tag, multiple = *args
  @unconfirmed_lock.synchronize do
    # A tag we're not tracking (a duplicate, out-of-order, or broker
    # quirk) is logged and ignored, not raised: #confirm runs on the
    # read_loop thread, where an exception would tear down the connection.
    confirmed =
      if multiple
        idx = @unconfirmed.index(delivery_tag)
        @unconfirmed.shift(idx + 1) if idx
      else
        @unconfirmed.delete(delivery_tag)
      end
    if confirmed
      @nacked = true if ack_or_nack == :nack
      @unconfirmed_empty.broadcast if @unconfirmed.empty?
    else
      warn "AMQP-Client received #{ack_or_nack} for unknown delivery tag #{delivery_tag} on channel #{@id}"
    end
  end
end

#confirm_select(no_wait: false) ⇒ nil

Put the channel in confirm mode, each published message will then be confirmed by the broker

Parameters:

  • no_wait (Boolean) (defaults to: false)

    If false the method will block until the broker has confirmed the request

Returns:

  • (nil)


455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/amqp/client/channel.rb', line 455

def confirm_select(no_wait: false)
  return if @confirm # fast path

  @unconfirmed_lock.synchronize do
    # check again in case another thread already did this while we waited for the lock
    return if @confirm

    write_bytes FrameBytes.confirm_select(@id, no_wait)
    expect :confirm_select_ok unless no_wait
    @confirm = 0
  end
  nil
end

#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil

Bind an exchange to another exchange

Parameters:

  • source (String)

    Name of the source exchange

  • destination (String)

    Name of the destination exchange

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on, but only when bound to header exchanges

Returns:

  • (nil)


137
138
139
140
141
# File 'lib/amqp/client/channel.rb', line 137

def exchange_bind(source:, destination:, binding_key:, arguments: {})
  write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_bind_ok
  nil
end

#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil

Declare an exchange

Parameters:

  • name (String)

    Name of the exchange

  • type (String)

    Type of exchange (amq.direct, amq.fanout, amq.topic, amq.headers, etc.)

  • passive (Boolean) (defaults to: false)

    If true raise an exception if the exchange doesn’t already exists

  • durable (Boolean) (defaults to: true)

    If true the exchange will persist between broker restarts, also a requirement for persistent messages

  • auto_delete (Boolean) (defaults to: false)

    If true the exchange will be deleted when the last queue/exchange is unbound

  • internal (Boolean) (defaults to: false)

    If true the exchange can’t be published to directly

  • arguments (Hash) (defaults to: {})

    Custom arguments

Returns:

  • (nil)


114
115
116
117
118
# File 'lib/amqp/client/channel.rb', line 114

def exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {})
  write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments)
  expect :exchange_declare_ok
  nil
end

#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil

Delete an exchange

Parameters:

  • name (String)

    Name of the exchange

  • if_unused (Boolean) (defaults to: false)

    If true raise an exception if queues/exchanges is bound to this exchange

  • no_wait (Boolean) (defaults to: false)

    If true don’t wait for a broker confirmation

Returns:

  • (nil)


125
126
127
128
129
# File 'lib/amqp/client/channel.rb', line 125

def exchange_delete(name, if_unused: false, no_wait: false)
  write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
  expect :exchange_delete_ok unless no_wait
  nil
end

#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil

Unbind an exchange from another exchange

Parameters:

  • source (String)

    Name of the source exchange

  • destination (String)

    Name of the destination exchange

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


149
150
151
152
153
# File 'lib/amqp/client/channel.rb', line 149

def exchange_unbind(source:, destination:, binding_key:, arguments: {})
  write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_unbind_ok
  nil
end

#header_delivered(body_size, properties) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



561
562
563
564
565
566
567
568
569
# File 'lib/amqp/client/channel.rb', line 561

def header_delivered(body_size, properties)
  @next_msg.properties = properties
  if body_size.zero?
    next_message_finished!
  else
    @next_body = StringIO.new(String.new(capacity: body_size))
    @next_body_size = body_size
  end
end

#inspectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Override #inspect



34
35
36
37
# File 'lib/amqp/client/channel.rb', line 34

def inspect
  "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?} " \
    "consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>"
end

#message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



551
552
553
# File 'lib/amqp/client/channel.rb', line 551

def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key)
  @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered)
end

#message_returned(reply_code, reply_text, exchange, routing_key) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



546
547
548
# File 'lib/amqp/client/channel.rb', line 546

def message_returned(reply_code, reply_text, exchange, routing_key)
  @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key)
end

#on_return {|ReturnMessage| ... } ⇒ Object

Handle returned messages in this block. If not set the message will just be logged to STDERR

Yields:

  • (ReturnMessage)

    Messages returned by the broker when a publish has failed

Returns:

  • nil



97
98
99
100
# File 'lib/amqp/client/channel.rb', line 97

def on_return(&block)
  @on_return = block
  nil
end

#openChannel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Open the channel (called from Connection)

Returns:



50
51
52
53
54
55
56
57
# File 'lib/amqp/client/channel.rb', line 50

def open
  return self if @open

  @open = true
  write_bytes FrameBytes.channel_open(@id)
  expect(:channel_open_ok)
  self
end

#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil

Bind a queue to an exchange

Parameters:

  • name (String)

    Name of the queue

  • exchange (String)

    Name of the exchange

  • binding_key (String) (defaults to: "")

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on, but only when bound to header exchanges

Returns:

  • (nil)


207
208
209
210
211
# File 'lib/amqp/client/channel.rb', line 207

def queue_bind(name, exchange:, binding_key: "", arguments: {})
  write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
  expect :queue_bind_ok
  nil
end

#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk

Create a queue (operation is idempotent)

Parameters:

  • name (String) (defaults to: "")

    Name of the queue, can be empty, but will then be generated by the broker

  • passive (Boolean) (defaults to: false)

    If true an exception will be raised if the queue doesn’t already exists

  • durable (Boolean) (defaults to: true)

    If true the queue will survive broker restarts, messages in the queue will only survive if they are published as persistent

  • exclusive (Boolean) (defaults to: false)

    If true the queue will be deleted when the connection is closed

  • auto_delete (Boolean) (defaults to: false)

    If true the queue will be deleted when the last consumer stops consuming (it won’t be deleted until at least one consumer has consumed from it)

  • arguments (Hash) (defaults to: {})

    Custom arguments, such as queue-ttl etc.

Returns:



177
178
179
180
181
182
183
184
185
186
# File 'lib/amqp/client/channel.rb', line 177

def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
  durable = false if name.empty?
  exclusive = true if name.empty?
  auto_delete = true if name.empty?

  write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
  name, message_count, consumer_count = expect(:queue_declare_ok)

  QueueOk.new(name, message_count, consumer_count)
end

#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?

Delete a queue

Parameters:

  • name (String)

    Name of the queue

  • if_unused (Boolean) (defaults to: false)

    Only delete if the queue doesn’t have consumers, raises a ChannelClosed error otherwise

  • if_empty (Boolean) (defaults to: false)

    Only delete if the queue is empty, raises a ChannelClosed error otherwise

  • no_wait (Boolean) (defaults to: false)

    Don’t wait for a broker confirmation if true

Returns:

  • (Integer)

    Number of messages in queue when deleted

  • (nil)

    If no_wait was set true



195
196
197
198
199
# File 'lib/amqp/client/channel.rb', line 195

def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
  write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
  message_count, = expect :queue_delete unless no_wait
  message_count
end

#queue_purge(name, no_wait: false) ⇒ Integer?

Purge a queue

Parameters:

  • name (String)

    Name of the queue

  • no_wait (Boolean) (defaults to: false)

    Don’t wait for a broker confirmation if true

Returns:

  • (Integer)

    Number of messages in queue when purged

  • (nil)

    If no_wait was set true



218
219
220
221
222
# File 'lib/amqp/client/channel.rb', line 218

def queue_purge(name, no_wait: false)
  write_bytes FrameBytes.queue_purge(@id, name, no_wait)
  message_count, = expect :queue_purge_ok unless no_wait
  message_count
end

#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil

Unbind a queue from an exchange

Parameters:

  • name (String)

    Name of the queue

  • exchange (String)

    Name of the exchange

  • binding_key (String) (defaults to: "")

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


230
231
232
233
234
# File 'lib/amqp/client/channel.rb', line 230

def queue_unbind(name, exchange:, binding_key: "", arguments: {})
  write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
  expect :queue_unbind_ok
  nil
end

#reply(args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



541
542
543
# File 'lib/amqp/client/channel.rb', line 541

def reply(args)
  @replies.push(args)
end

#tx_commitnil

Commmit a transaction, requires that the channel is in transaction mode

Returns:

  • (nil)


524
525
526
527
528
# File 'lib/amqp/client/channel.rb', line 524

def tx_commit
  write_bytes FrameBytes.tx_commit(@id)
  expect :tx_commit_ok
  nil
end

#tx_rollbacknil

Rollback a transaction, requires that the channel is in transaction mode

Returns:

  • (nil)


532
533
534
535
536
# File 'lib/amqp/client/channel.rb', line 532

def tx_rollback
  write_bytes FrameBytes.tx_rollback(@id)
  expect :tx_rollback_ok
  nil
end

#tx_selectnil

Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish

Returns:

  • (nil)


516
517
518
519
520
# File 'lib/amqp/client/channel.rb', line 516

def tx_select
  write_bytes FrameBytes.tx_select(@id)
  expect :tx_select_ok
  nil
end

#wait_for_confirmsBoolean

Block until all publishes messages are confirmed

Returns:

  • (Boolean)

    True if all messages were acked, false if any were nacked



471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/amqp/client/channel.rb', line 471

def wait_for_confirms
  @unconfirmed_lock.synchronize do
    until @unconfirmed.empty?
      # Check before waiting: if the channel was closed (and the
      # @unconfirmed_empty broadcast from #closed! fired) before we got
      # here, the wakeup is already gone and #wait would block forever.
      raise Error::Closed.new(@id, *@closed) if @closed

      @unconfirmed_empty.wait(@unconfirmed_lock)
    end
    result = !@nacked
    @nacked = false # Reset for next round of publishes
    result
  end
end