Class: ActionSubscriber::Middleware::Env

Inherits:
Object
  • Object
show all
Defined in:
lib/action_subscriber/middleware/env.rb

Constant Summary collapse

ACK_INSTRUMENT_KEY =
"message_acked.action_subscriber".freeze
NACK_INSTRUMENT_KEY =
"message_nacked.action_subscriber".freeze
REJECT_INSTRUMENT_KEY =
"message_rejected.action_subscriber".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscriber, encoded_payload, properties) ⇒ Env

Returns a new instance of Env.

Parameters:

  • subscriber (Class)

    the class that will handle this message

  • encoded_payload (String)

    the payload as it was received from RabbitMQ

  • properties (Hash)

    that must contain the following keys (as symbols) :channel => RabbitMQ channel for doing acknowledgement :content_type => String :delivery_tag => String (the message identifier to send back to rabbitmq for acknowledgement) :exchange => String :headers => Hash[ String => String ] :message_id => String :routing_key => String



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/action_subscriber/middleware/env.rb', line 34

def initialize(subscriber, encoded_payload, properties)
  @action = properties.fetch(:action)
  @channel = properties[:channel]
  @content_type = properties.fetch(:content_type)
  @delivery_tag = properties.fetch(:delivery_tag)
  @encoded_payload = encoded_payload
  @exchange = properties.fetch(:exchange)
  @has_been_acked = false
  @has_been_nacked = false
  @has_been_rejected = false
  @headers = properties.fetch(:headers, {})
  @message_id = properties[:message_id].presence || ::SecureRandom.hex(3)
  @queue = properties.fetch(:queue)
  @routing_key = properties.fetch(:routing_key)
  @subscriber = subscriber
  @uses_acknowledgements = properties.fetch(:uses_acknowledgements, false)
end

Instance Attribute Details

#actionObject (readonly)

Returns the value of attribute action.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def action
  @action
end

#channelObject (readonly)

Returns the value of attribute channel.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def channel
  @channel
end

#content_typeObject (readonly)

Returns the value of attribute content_type.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def content_type
  @content_type
end

#encoded_payloadObject (readonly)

Returns the value of attribute encoded_payload.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def encoded_payload
  @encoded_payload
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def exchange
  @exchange
end

#headersObject (readonly)

Returns the value of attribute headers.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def headers
  @headers
end

#message_idObject (readonly)

Returns the value of attribute message_id.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def message_id
  @message_id
end

#payloadObject

Returns the value of attribute payload.



10
11
12
# File 'lib/action_subscriber/middleware/env.rb', line 10

def payload
  @payload
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def queue
  @queue
end

#routing_keyObject (readonly)

Returns the value of attribute routing_key.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def routing_key
  @routing_key
end

#subscriberObject (readonly)

Returns the value of attribute subscriber.



12
13
14
# File 'lib/action_subscriber/middleware/env.rb', line 12

def subscriber
  @subscriber
end

Instance Method Details

#acknowledgeObject



52
53
54
55
56
57
58
59
60
61
# File 'lib/action_subscriber/middleware/env.rb', line 52

def acknowledge
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_acked
  acknowledge_multiple_messages = false
  @has_been_acked = true
  instrument_for(ACK_INSTRUMENT_KEY) do
    @channel.ack(@delivery_tag, acknowledge_multiple_messages)
  end
  true
end

#channel_open?Boolean

Returns:

  • (Boolean)


63
64
65
66
# File 'lib/action_subscriber/middleware/env.rb', line 63

def channel_open?
  return false unless @channel
  @channel.open?
end

#nackObject



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/action_subscriber/middleware/env.rb', line 68

def nack
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_nacked
  nack_multiple_messages = false
  requeue_message = true
  @has_been_nacked = true
  instrument_for(NACK_INSTRUMENT_KEY) do
    @channel.nack(@delivery_tag, nack_multiple_messages, requeue_message)
  end
  true
end

#rejectObject



80
81
82
83
84
85
86
87
88
89
# File 'lib/action_subscriber/middleware/env.rb', line 80

def reject
  fail ::RuntimeError, "you can't acknowledge messages under the polling API" unless @channel
  return true if @has_been_rejected
  requeue_message = true
  @has_been_rejected = true
  instrument_for(REJECT_INSTRUMENT_KEY) do
    @channel.reject(@delivery_tag, requeue_message)
  end
  true
end

#safe_acknowledgeObject



91
92
93
# File 'lib/action_subscriber/middleware/env.rb', line 91

def safe_acknowledge
  acknowledge if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end

#safe_nackObject



95
96
97
# File 'lib/action_subscriber/middleware/env.rb', line 95

def safe_nack
  nack if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end

#safe_rejectObject



99
100
101
# File 'lib/action_subscriber/middleware/env.rb', line 99

def safe_reject
  reject if uses_acknowledgements? && channel_open? && !has_used_delivery_tag?
end

#to_hashObject Also known as: to_h



103
104
105
106
107
108
109
110
111
# File 'lib/action_subscriber/middleware/env.rb', line 103

def to_hash
  {
    :action => action,
    :content_type => content_type,
    :exchange => exchange,
    :routing_key => routing_key,
    :payload => payload
  }
end