Module: ActionSubscriber::DSL

Included in:
Base
Defined in:
lib/action_subscriber/dsl.rb

Defined Under Namespace

Classes: Filter

Instance Method Summary collapse

Instance Method Details

#_run_action_at_least_once_with_filters(env, action) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/action_subscriber/dsl.rb', line 176

def _run_action_at_least_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false

  _run_action_with_filters(env, action)

  processed_acknowledgement = env.acknowledge
rescue
  ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff(env)
  processed_acknowledgement = env.acknowledge

  raise
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_at_most_once_with_filters(env, action) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/action_subscriber/dsl.rb', line 145

def _run_action_at_most_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false
  processed_acknowledgement = env.acknowledge

  _run_action_with_filters(env, action)
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_with_filters(env, action) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/action_subscriber/dsl.rb', line 131

def _run_action_with_filters(env, action)
  subscriber_instance = self.new(env)
  final_block = Proc.new { subscriber_instance.public_send(action) }

  first_proc = around_filters.reverse.reduce(final_block) do |block, filter|
    if filter.matches(action)
      Proc.new { subscriber_instance.send(filter.callback_method, &block) }
    else
      block
    end
  end
  first_proc.call
end

#acknowledge_messages?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/action_subscriber/dsl.rb', line 54

def acknowledge_messages?
  !!@_acknowledge_messages
end

#around_filter(callback_method, options = nil) ⇒ Object



58
59
60
61
62
# File 'lib/action_subscriber/dsl.rb', line 58

def around_filter(callback_method, options = nil)
  filter = Filter.new(callback_method, options)
  conditionally_add_filter!(filter)
  around_filters
end

#around_filtersObject



68
69
70
# File 'lib/action_subscriber/dsl.rb', line 68

def around_filters
  @_around_filters ||= []
end

#at_least_once!Object



36
37
38
39
# File 'lib/action_subscriber/dsl.rb', line 36

def at_least_once!
  @_acknowledge_messages = true
  @_at_least_once = true
end

#at_least_once?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/action_subscriber/dsl.rb', line 41

def at_least_once?
  !!@_at_least_once
end

#at_most_once!Object



45
46
47
48
# File 'lib/action_subscriber/dsl.rb', line 45

def at_most_once!
  @_acknowledge_messages = true
  @_at_most_once = true
end

#at_most_once?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/action_subscriber/dsl.rb', line 50

def at_most_once?
  !!@_at_most_once
end

#conditionally_add_filter!(filter) ⇒ Object



64
65
66
# File 'lib/action_subscriber/dsl.rb', line 64

def conditionally_add_filter!(filter)
  around_filters << filter unless around_filters.any? { |f| f.callback_method == filter.callback_method }
end

#exchange_names(*names) ⇒ Object Also known as: exchange

Explicitly set the name of the exchange



74
75
76
77
78
79
80
81
82
83
# File 'lib/action_subscriber/dsl.rb', line 74

def exchange_names(*names)
  @_exchange_names ||= []
  @_exchange_names += names.flatten.map(&:to_s)

  if @_exchange_names.empty?
    return [ ::ActionSubscriber.config.default_exchange ]
  else
    return @_exchange_names.compact.uniq
  end
end

#manual_acknowledgement!Object



86
87
88
89
# File 'lib/action_subscriber/dsl.rb', line 86

def manual_acknowledgement!
  @_acknowledge_messages = true
  @_manual_acknowedgement = true
end

#manual_acknowledgement?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/action_subscriber/dsl.rb', line 91

def manual_acknowledgement?
  !!@_manual_acknowedgement
end

#no_acknowledgement!Object



95
96
97
# File 'lib/action_subscriber/dsl.rb', line 95

def no_acknowledgement!
  @_acknowledge_messages = false
end

#queue_for(method, queue_name) ⇒ Object

Explicitly set the name of a queue for the given method route

Ex.

queue_for :created, "derp.derp"
queue_for :updated, "foo.bar"


105
106
107
108
# File 'lib/action_subscriber/dsl.rb', line 105

def queue_for(method, queue_name)
  @_queue_names ||= {}
  @_queue_names[method] = queue_name
end

#queue_namesObject



110
111
112
# File 'lib/action_subscriber/dsl.rb', line 110

def queue_names
  @_queue_names ||= {}
end

#remote_application_name(name = nil) ⇒ Object Also known as: publisher



114
115
116
117
# File 'lib/action_subscriber/dsl.rb', line 114

def remote_application_name(name = nil)
  @_remote_application_name = name if name
  @_remote_application_name
end

#routing_key_for(method, routing_key_name) ⇒ Object

Explicitly set the whole routing key to use for a given method route.



122
123
124
125
# File 'lib/action_subscriber/dsl.rb', line 122

def routing_key_for(method, routing_key_name)
  @_routing_key_names ||= {}
  @_routing_key_names[method] = routing_key_name
end

#routing_key_namesObject



127
128
129
# File 'lib/action_subscriber/dsl.rb', line 127

def routing_key_names
  @_routing_key_names ||= {}
end

#run_action_with_filters(env, action) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/action_subscriber/dsl.rb', line 213

def run_action_with_filters(env, action)
  case
  when at_least_once?
    _run_action_at_least_once_with_filters(env, action)
  when at_most_once?
    _run_action_at_most_once_with_filters(env, action)
  else
    _run_action_with_filters(env, action)
  end
end