Class: BBK::AMQP::Consumer
- Inherits:
-
Object
- Object
- BBK::AMQP::Consumer
- Defined in:
- lib/bbk/amqp/consumer.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ consumer_pool_size: 3, consumer_pool_abort_on_exception: true, prefetch_size: 10, consumer_tag: nil, rejection_policy: RejectionPolicies::Requeue.new }.freeze
- PROTOCOLS =
%w[mq amqp amqps].freeze
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#publisher ⇒ Object
Returns the value of attribute publisher.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#rejection_policy ⇒ Object
readonly
Returns the value of attribute rejection_policy.
Instance Method Summary collapse
-
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
Ack incoming message and not send answer.
-
#close ⇒ Object
Close consumer - try close amqp channel.
-
#initialize(connection, queue_name: nil, publisher: nil, **options) ⇒ Consumer
constructor
A new instance of Consumer.
-
#nack(incoming, *args, error: nil, **kwargs) ⇒ Object
Nack incoming message.
-
#protocols ⇒ Object
Return protocol list which consumer support.
-
#run(msg_stream) ⇒ Object
Running non blocking consumer.
-
#stop ⇒ Object
stop consuming messages.
-
#sync? ⇒ Boolean
Signal that need answer on every incoming message.
Constructor Details
#initialize(connection, queue_name: nil, publisher: nil, **options) ⇒ Consumer
Returns a new instance of Consumer.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/bbk/amqp/consumer.rb', line 21 def initialize(connection, queue_name: nil, publisher: nil, **) @connection = connection @channel = .delete(:channel) @queue = .delete(:queue) @publisher = publisher if @queue.nil? && queue_name.nil? raise ArgumentError.new('queue_name or queue must be provided!') end @queue_name = @queue&.name || queue_name @options = DEFAULT_OPTIONS.merge() @rejection_policy = @options.delete(:rejection_policy) logger = @options.fetch(:logger, BBK::AMQP.logger) logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name]) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def connection @connection end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def @options end |
#publisher ⇒ Object
Returns the value of attribute publisher.
9 10 11 |
# File 'lib/bbk/amqp/consumer.rb', line 9 def publisher @publisher end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue @queue end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue_name @queue_name end |
#rejection_policy ⇒ Object (readonly)
Returns the value of attribute rejection_policy.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def rejection_policy @rejection_policy end |
Instance Method Details
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
answer should processing amqp publisher
Ack incoming message and not send answer.
85 86 87 88 89 90 91 92 |
# File 'lib/bbk/amqp/consumer.rb', line 85 def ack(incoming, *args, answer: nil, **kwargs) # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у # которого to_i (вызывается внутри channel.ack) вернет фактическоe число # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" send_answer(incoming, answer) unless answer.nil? logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag] end |
#close ⇒ Object
Close consumer - try close amqp channel
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/bbk/amqp/consumer.rb', line 121 def close @channel.tap do |c| return nil unless c logger.info 'Closing...' @channel = nil c.close logger.info 'Stopped' end end |
#nack(incoming, *args, error: nil, **kwargs) ⇒ Object
Nack incoming message
105 106 107 |
# File 'lib/bbk/amqp/consumer.rb', line 105 def nack(incoming, *args, error: nil, **kwargs) rejection_policy.call(incoming, error, *args, **kwargs) end |
#protocols ⇒ Object
Return protocol list which consumer support
42 43 44 |
# File 'lib/bbk/amqp/consumer.rb', line 42 def protocols PROTOCOLS end |
#run(msg_stream) ⇒ Object
Running non blocking consumer
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/bbk/amqp/consumer.rb', line 53 def run(msg_stream) @channel ||= @connection.create_channel(nil, [:consumer_pool_size], [:consumer_pool_abort_on_exception]).tap do |ch| ch.prefetch([:prefetch_size]) end logger. "Ch##{@channel.id}" @queue ||= @channel.queue(queue_name, passive: true) subscribe_opts = { block: false, manual_ack: true, consumer_tag: [:consumer_tag], exclusive: .fetch(:exclusive, false) }.compact logger.info 'Starting...' @subscription = queue.subscribe(subscribe_opts) do |delivery_info, , payload| = Message.new(self, delivery_info, , payload) # logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] on channel: #{delivery_info.channel&.id}[#{delivery_info.channel&.object_id}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}" logger.debug "Consumed message #{.headers[:type]}[#{.headers[:message_id]}] delivery tag: #{.delivery_info[:delivery_tag].to_i}" msg_stream << end msg_stream end |
#stop ⇒ Object
stop consuming messages
110 111 112 113 114 115 116 117 118 |
# File 'lib/bbk/amqp/consumer.rb', line 110 def stop @subscription.tap do |s| return nil unless s logger.info 'Stopping...' @subscription = nil s.cancel end end |
#sync? ⇒ Boolean
Signal that need answer on every incoming message
47 48 49 |
# File 'lib/bbk/amqp/consumer.rb', line 47 def sync? false end |