Class: EventQ::RabbitMq::QueueWorker
- Inherits:
-
Object
- Object
- EventQ::RabbitMq::QueueWorker
- Includes:
- WorkerId
- Defined in:
- lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb
Instance Attribute Summary collapse
-
#context ⇒ Object
Returns the value of attribute context.
-
#is_running ⇒ Object
Returns the value of attribute is_running.
Instance Method Summary collapse
-
#acknowledge_message(channel, delivery_tag) ⇒ Object
Logic for the RabbitMq adapter when a message is accepted.
- #configure(options = {}) ⇒ Object
- #deserialize_message(payload) ⇒ Object
-
#initialize ⇒ QueueWorker
constructor
A new instance of QueueWorker.
- #pre_process(context, options) ⇒ Object
- #reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) ⇒ Object
- #serialize_message(msg) ⇒ Object
-
#thread_process_iteration(queue, options, block) ⇒ Object
This method should not be called iteratively and will sit in a loop The reason is because this uses a push notification from the subscribe mechanism to trigger the block and will exit if you do not block.
Methods included from WorkerId
#tag_processing_thread, #untag_processing_thread
Constructor Details
#initialize ⇒ QueueWorker
Returns a new instance of QueueWorker.
8 9 10 11 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 8 def initialize @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new end |
Instance Attribute Details
#context ⇒ Object
Returns the value of attribute context.
6 7 8 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 6 def context @context end |
#is_running ⇒ Object
Returns the value of attribute is_running.
6 7 8 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 6 def is_running @is_running end |
Instance Method Details
#acknowledge_message(channel, delivery_tag) ⇒ Object
Logic for the RabbitMq adapter when a message is accepted
97 98 99 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 97 def (channel, delivery_tag) channel.acknowledge(delivery_tag, false) end |
#configure(options = {}) ⇒ Object
92 93 94 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 92 def configure( = {}) [:durable] ||= true end |
#deserialize_message(payload) ⇒ Object
56 57 58 59 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 56 def (payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end |
#pre_process(context, options) ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 13 def pre_process(context, ) manager = EventQ::RabbitMq::QueueManager.new manager.durable = [:durable] [:manager] = manager connection = [:client].dup.get_connection [:connection] = connection end |
#reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 66 def (channel, , delivery_tag, retry_exchange, queue, abort) EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") # reject the message to remove from queue channel.reject(delivery_tag, false) # check if the message retry limit has been exceeded if .retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{()}") context.call_on_retry_exceeded_block() # check if the message is allowed to be retried elsif queue.allow_retry .retry_attempts += 1 retry_attempts = .retry_attempts - queue.retry_back_off_grace retry_attempts = 1 if retry_attempts < 1 = retry_delay(queue, retry_attempts) EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{}" } retry_exchange.publish((), :expiration => ) context.call_on_retry_block() end return true end |
#serialize_message(msg) ⇒ Object
61 62 63 64 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 61 def (msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end |
#thread_process_iteration(queue, options, block) ⇒ Object
This method should not be called iteratively and will sit in a loop The reason is because this uses a push notification from the subscribe mechanism to trigger the block and will exit if you do not block.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 25 def thread_process_iteration(queue, , block) manager = [:manager] channel = [:connection].create_channel channel.prefetch(1) q = manager.get_queue(channel, queue) retry_exchange = manager.get_retry_exchange(channel, queue) q.subscribe(:manual_ack => true, :block => false, :exclusive => false) do |delivery_info, properties, payload| begin tag_processing_thread (payload, queue, channel, retry_exchange, delivery_info.delivery_tag, block) rescue => e EventQ.logger.error( "[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | "\ "Backtrace: #{e.backtrace}" ) context.call_on_error_block(error: e) ensure untag_processing_thread end end # we don't want to stop the subscribe process as it will not block. sleep 5 while context.running? if channel != nil && channel.open? channel.close end end |