Class: RSMP::Collector

Inherits:
Listener show all
Defined in:
lib/rsmp/collect/collector.rb

Overview

Collects messages from a notifier. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Listener

#change_notifier

Methods included from Inspect

#inspector

Constructor Details

#initialize(notifier, options = {}) ⇒ Collector

Returns a new instance of Collector.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/rsmp/collect/collector.rb', line 9

def initialize notifier, options={}
  super notifier, options
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false,
    }
  }.deep_merge options
  @ingoing = options[:ingoing] == nil ? true  : options[:ingoing]
  @outgoing = options[:outgoing] == nil ? false : options[:outgoing]
  @condition = Async::Notification.new
  @title = options[:title] || [@options[:type]].flatten.join('/')
  if options[:task]
    @task = options[:task]
  else
     # if notifier is a Proxy, or some other object that implements task(),
     # then try to get the task that way
    if notifier.respond_to? 'task'
      @task = notifier.task
    end
  end
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def condition
  @condition
end

#errorObject (readonly)

Returns the value of attribute error.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def error
  @error
end

#messagesObject (readonly)

Returns the value of attribute messages.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def status
  @status
end

#taskObject (readonly)

Returns the value of attribute task.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def task
  @task
end

Instance Method Details

#cancel(error = nil) ⇒ Object

Abort collection



253
254
255
256
257
# File 'lib/rsmp/collect/collector.rb', line 253

def cancel error=nil
  @error = error
  @status = :cancelled
  do_stop
end

#cancelled?Boolean

Has collection been cancelled?

Returns:

  • (Boolean)


70
71
72
# File 'lib/rsmp/collect/collector.rb', line 70

def cancelled?
  @status == :cancelled
end

#collect(&block) ⇒ Object

Collect message Will return once all messages have been collected, or timeout is reached



93
94
95
96
97
98
99
# File 'lib/rsmp/collect/collector.rb', line 93

def collect &block
  start &block
  wait
  @status
ensure
  @notifier.remove_listener self if @notifier
end

#collect!(&block) ⇒ Object

Collect message Returns the collected messages, or raise an exception in case of a time out.



103
104
105
106
107
# File 'lib/rsmp/collect/collector.rb', line 103

def collect! &block
  collect(&block)
  ok!
  @messages
end

#collecting?Boolean

Is collection active?

Returns:

  • (Boolean)


50
51
52
# File 'lib/rsmp/collect/collector.rb', line 50

def collecting?
  @status == :collecting
end

#completeObject

Called when we're done collecting. Remove ourself as a listener, se we don't receive message notifications anymore



205
206
207
208
209
# File 'lib/rsmp/collect/collector.rb', line 205

def complete
  @status = :ok
  do_stop
  log_complete
end

#describeObject



181
182
# File 'lib/rsmp/collect/collector.rb', line 181

def describe
end

#describe_num_and_typeObject

return a string that describes whe number of messages, and type of message we're collecting



283
284
285
286
287
288
289
# File 'lib/rsmp/collect/collector.rb', line 283

def describe_num_and_type
  if @options[:num] && @options[:num] > 1
    "#{@options[:num]} #{@options[:type]}s"
  else
    @options[:type]
  end
end

#describe_progressObject

return a string that describe how many many messages have been collected



147
148
149
150
151
152
153
# File 'lib/rsmp/collect/collector.rb', line 147

def describe_progress
  str = "#{identifier}: #{@title.capitalize} collection "
  str << "in response to #{@options[:m_id]} " if @options[:m_id]
  str << "didn't complete within #{@options[:timeout]}s, "
  str << "reached #{@messages.size}/#{@options[:num]}"
  str
end

#describe_queryObject

return a string that describes the attributes that we're looking for



292
293
294
295
296
297
298
299
# File 'lib/rsmp/collect/collector.rb', line 292

def describe_query
  h = {component: @options[:component]}.compact
  if h.empty?
    describe_num_and_type
  else
    "#{describe_num_and_type} #{h}"
  end
end

#do_stopObject

Remove ourself as a listener, so we don't receive message notifications anymore, and wake up the async condition



218
219
220
221
# File 'lib/rsmp/collect/collector.rb', line 218

def do_stop
  @notifier.remove_listener self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


199
200
201
# File 'lib/rsmp/collect/collector.rb', line 199

def done?
  @options[:num] && @messages.size >= @options[:num]
end

#identifierObject

get a short id in hex format, identifying ourself



326
327
328
# File 'lib/rsmp/collect/collector.rb', line 326

def identifier
  "Collect #{self.object_id.to_s(16)}"
end

#incompleteObject

called when we received a message, but are not done yet



212
213
214
# File 'lib/rsmp/collect/collector.rb', line 212

def incomplete
  log_incomplete
end

#ingoing?Boolean

Want ingoing messages?

Returns:

  • (Boolean)


75
76
77
# File 'lib/rsmp/collect/collector.rb', line 75

def ingoing?
  @ingoing == true
end

#inspectObject

Inspect formatter that shows the message we have collected



45
46
47
# File 'lib/rsmp/collect/collector.rb', line 45

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



260
261
262
# File 'lib/rsmp/collect/collector.rb', line 260

def keep message
  @messages << message
end

#log_completeObject

log when we end collecting



321
322
323
# File 'lib/rsmp/collect/collector.rb', line 321

def log_complete
  @notifier.log "#{identifier}: Done", level: :collect
end

#log_incompleteObject

log current progress



316
317
318
# File 'lib/rsmp/collect/collector.rb', line 316

def log_incomplete
  @notifier.log "#{identifier}: #{describe_progress}", level: :collect
end

#log_startObject

log when we start collecting



311
312
313
# File 'lib/rsmp/collect/collector.rb', line 311

def log_start
  @notifier.log "#{identifier}: Waiting for #{describe_query}".strip, level: :collect
end

#notify(message) ⇒ Object

Handle message. and return true when we're done collecting

Raises:

  • (ArgumentError)


168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/rsmp/collect/collector.rb', line 168

def notify message
  raise ArgumentError unless message
  raise RuntimeError.new("can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}") unless ready? || collecting?
  if perform_match message
    if done?
      complete
    else
      incomplete
    end
  end
  @status
end

#notify_disconnect(error, options) ⇒ Object

Cancel if we received e notificaiton about a disconnect



246
247
248
249
250
# File 'lib/rsmp/collect/collector.rb', line 246

def notify_disconnect error, options
  return unless @options.dig(:cancel,:disconnect)
  @notifier.log "#{identifier}: cancelled due to a connection error: #{error.to_s}", level: :debug
  cancel error
end

#notify_error(error, options = {}) ⇒ Object

An error occured upstream. Check if we should cancel.



225
226
227
228
229
230
231
232
# File 'lib/rsmp/collect/collector.rb', line 225

def notify_error error, options={}
  case error
  when RSMP::SchemaError
    notify_schema_error error, options
  when RSMP::DisconnectError
    notify_disconnect error, options
  end
end

#notify_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we're collecting



235
236
237
238
239
240
241
242
243
# File 'lib/rsmp/collect/collector.rb', line 235

def notify_schema_error error, options
  return unless @options.dig(:cancel,:schema_error)
  message = options[:message]
  return unless message
  klass = message.class.name.split('::').last
  return unless @options[:type] == nil || [@options[:type]].flatten.include?(klass)
  @notifier.log "#{identifier}: cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#ok!Object

if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector

Raises:



86
87
88
89
# File 'lib/rsmp/collect/collector.rb', line 86

def ok!
  raise @error if @error
  self
end

#ok?Boolean

Is collection complete?

Returns:

  • (Boolean)


55
56
57
# File 'lib/rsmp/collect/collector.rb', line 55

def ok?
  @status == :ok
end

#outgoing?Boolean

Want outgoing messages?

Returns:

  • (Boolean)


80
81
82
# File 'lib/rsmp/collect/collector.rb', line 80

def outgoing?
  @outgoing == true
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/rsmp/collect/collector.rb', line 185

def perform_match message
  return false if reject_not_ack(message)
  return false unless type_match?(message)
  #@notifier.log "#{identifier}: Looking at #{message.type} #{message.m_id_short}", level: :collect
  if @block
    status = [@block.call(message)].flatten
    return unless collecting?
    keep message if status.include?(:keep)
  else
    keep message
  end
end

#ready?Boolean

Is collection ready to start?

Returns:

  • (Boolean)


65
66
67
# File 'lib/rsmp/collect/collector.rb', line 65

def ready?
  @status == :ready
end

#reject_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



156
157
158
159
160
161
162
163
164
165
# File 'lib/rsmp/collect/collector.rb', line 156

def reject_not_ack message
  return unless @options[:m_id]
  if message.is_a?(MessageNotAck)
    if message.attribute('oMId') == @options[:m_id]
      m_id_short = RSMP::Message.shorten_m_id @options[:m_id], 8
      cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
      true
    end
  end
end

#resetObject

Clear all query results



38
39
40
41
42
# File 'lib/rsmp/collect/collector.rb', line 38

def reset
  @messages = []
  @error = nil
  @status = :ready
end

#start(&block) ⇒ Object

Start collection and return immediately You can later use wait() to wait for completion

Raises:

  • (RuntimeError)


136
137
138
139
140
141
142
143
144
# File 'lib/rsmp/collect/collector.rb', line 136

def start &block
  raise RuntimeError.new("Can't start collectimng unless ready (currently #{@status})") unless ready?
  @block = block
  raise ArgumentError.new("Num, timeout or block must be provided") unless @options[:num] || @options[:timeout] || @block
  reset
  @status = :collecting
  log_start
  @notifier.add_listener self if @notifier
end

#timeout?Boolean

Has collection timed out?

Returns:

  • (Boolean)


60
61
62
# File 'lib/rsmp/collect/collector.rb', line 60

def timeout?
  @status == :timeout
end

#type_match?(message) ⇒ Boolean

Check a message against our match criteria Return true if there's a match, false if not

Returns:

  • (Boolean)


266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/rsmp/collect/collector.rb', line 266

def type_match? message
  return false if message.direction == :in && @ingoing == false
  return false if message.direction == :out && @outgoing == false
  if @options[:type]
    if @options[:type].is_a? Array
      return false unless @options[:type].include? message.type
    else
      return false unless message.type == @options[:type]
    end
  end
  if @options[:component]
    return false if message.attributes['cId'] && message.attributes['cId'] != @options[:component]
  end
  true
end

#use_task(task) ⇒ Object



33
34
35
# File 'lib/rsmp/collect/collector.rb', line 33

def use_task task
  @task = task
end

#waitObject

If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.



111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rsmp/collect/collector.rb', line 111

def wait
  if collecting?
    if @options[:timeout]
      @task.with_timeout(@options[:timeout]) { @condition.wait }
    else
      @condition.wait
    end
  end
  @status
rescue Async::TimeoutError
  @error = RSMP::TimeoutError.new describe_progress
  @status = :timeout
end

#wait!Object

If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.

Raises:



128
129
130
131
132
# File 'lib/rsmp/collect/collector.rb', line 128

def wait!
  wait
  raise @error if timeout?
  @messages
end