Class: Rabbit::Receiving::Worker

Inherits:
Object
  • Object
show all
Includes:
Sneakers::Worker
Defined in:
lib/rabbit/receiving/worker.rb

Instance Method Summary collapse

Instance Method Details

#handle_error!(error) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/rabbit/receiving/worker.rb', line 42

def handle_error!(error)
  raise if Rabbit.config.environment == :test
  Rabbit.config.exception_notifier.call(error)
  # wait to prevent queue overflow
  sleep 1
  requeue!
end

#receive_message(message, delivery_info, arguments) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/rabbit/receiving/worker.rb', line 32

def receive_message(message, delivery_info, arguments)
  compress = arguments.dig(:headers, "compress") || false

  Rabbit::Receiving::Receive.new(
    message: prepare_message_for_receiving(message.dup, compress),
    delivery_info: delivery_info,
    arguments: arguments,
  ).call
end

#reinitialize_connectionObject



50
51
52
53
54
# File 'lib/rabbit/receiving/worker.rb', line 50

def reinitialize_connection
  stop
  @queue.instance_variable_set(:@banny, nil)
  run
end

#work_with_params(message, delivery_info, arguments) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rabbit/receiving/worker.rb', line 12

def work_with_params(message, delivery_info, arguments)
  attempt = 0
  begin
    # args and info have custom rabbit classes, have to convert them to hash
    receive_message(message, delivery_info.to_h, arguments.to_h)
    ack!
  rescue *Rabbit.config.connection_reset_exceptions => error
    attempt += 1
    if attempt <= Rabbit.config.connection_reset_max_retries
      sleep(Rabbit.config.connection_reset_timeout)
      reinitialize_connection
      retry
    else
      handle_error!(error)
    end
  end
rescue => error
  handle_error!(error)
end