Class: RSMP::Collector

Inherits:
Object
  • Object
show all
Includes:
Receiver
Defined in:
lib/rsmp/collect/collector.rb

Overview

Collects messages from a distributor. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Receiver

#accept_message?, #handle_message, #initialize_receiver, #reject_message?, #start_receiving, #stop_receiving

Methods included from Inspect

#inspector

Constructor Details

#initialize(distributor, options = {}) ⇒ Collector

Returns a new instance of Collector.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rsmp/collect/collector.rb', line 10

def initialize distributor, options={}
  initialize_receiver distributor, filter: options[:filter]
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false,
    }
  }.deep_merge options
  @timeout = options[:timeout]
  @num = options[:num]
  @m_id = options[:m_id]
  @condition = Async::Notification.new
  make_title options[:title]
  
  if task
    @task = task
  else
     # if distributor is a Proxy, or some other object that implements task(),
     # then try to get the task that way
    if distributor.respond_to? 'task'
      @task = distributor.task
    end
  end
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def condition
  @condition
end

#errorObject (readonly)

Returns the value of attribute error.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def error
  @error
end

#m_idObject (readonly)

Returns the value of attribute m_id.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def m_id
  @m_id
end

#messagesObject (readonly)

Returns the value of attribute messages.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def status
  @status
end

#taskObject (readonly)

Returns the value of attribute task.



8
9
10
# File 'lib/rsmp/collect/collector.rb', line 8

def task
  @task
end

Instance Method Details

#acceptable?(message) ⇒ Boolean

Check a message against our match criteria Return true if there’s a match, false if not

Returns:

  • (Boolean)


282
283
284
# File 'lib/rsmp/collect/collector.rb', line 282

def acceptable? message
  @filter == nil || @filter.accept?(message)
end

#cancel(error = nil) ⇒ Object

Abort collection



269
270
271
272
273
# File 'lib/rsmp/collect/collector.rb', line 269

def cancel error=nil
  @error = error
  @status = :cancelled
  do_stop
end

#cancelled?Boolean

Has collection been cancelled?

Returns:

  • (Boolean)


83
84
85
# File 'lib/rsmp/collect/collector.rb', line 83

def cancelled?
  @status == :cancelled
end

#collect(&block) ⇒ Object

Collect message Will return once all messages have been collected, or timeout is reached



106
107
108
109
110
111
112
# File 'lib/rsmp/collect/collector.rb', line 106

def collect &block
  start &block
  wait
  @status
ensure
  @distributor.remove_receiver self if @distributor
end

#collect!(&block) ⇒ Object

Collect message Returns the collected messages, or raise an exception in case of a time out.



116
117
118
119
120
# File 'lib/rsmp/collect/collector.rb', line 116

def collect! &block
  collect(&block)
  ok!
  @messages
end

#collecting?Boolean

Is collection active?

Returns:

  • (Boolean)


63
64
65
# File 'lib/rsmp/collect/collector.rb', line 63

def collecting?
  @status == :collecting
end

#completeObject

Called when we’re done collecting. Remove ourself as a receiver, se we don’t receive message notifications anymore



221
222
223
224
225
# File 'lib/rsmp/collect/collector.rb', line 221

def complete
  @status = :ok
  do_stop
  log_complete
end

#describeObject



197
198
# File 'lib/rsmp/collect/collector.rb', line 197

def describe
end

#describe_matcherObject

return a string that describes the attributes that we’re looking for



301
302
303
304
305
306
307
308
# File 'lib/rsmp/collect/collector.rb', line 301

def describe_matcher
  h = {component: @filter&.component}.compact
  if h.empty?
    describe_num_and_type
  else
    "#{describe_num_and_type} #{h}"
  end
end

#describe_num_and_typeObject

return a string that describes whe number of messages, and type of message we’re collecting



292
293
294
295
296
297
298
# File 'lib/rsmp/collect/collector.rb', line 292

def describe_num_and_type
  if @num && @num > 1
    "#{@num} #{describe_types}s"
  else
    describe_types
  end
end

#describe_progressObject

return a string that describe how many many messages have been collected



160
161
162
163
164
165
166
# File 'lib/rsmp/collect/collector.rb', line 160

def describe_progress
  str = "#{identifier}: #{@title.capitalize} collection "
  str << "in response to #{@m_id} " if @m_id
  str << "didn't complete within #{@timeout}s, "
  str << "reached #{@messages.size}/#{@num}"
  str
end

#describe_typesObject

return a string describing the types of messages we’re collecting



287
288
289
# File 'lib/rsmp/collect/collector.rb', line 287

def describe_types
  [@filter&.type].flatten.join('/')
end

#do_stopObject

Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition



234
235
236
237
# File 'lib/rsmp/collect/collector.rb', line 234

def do_stop
  @distributor.remove_receiver self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


215
216
217
# File 'lib/rsmp/collect/collector.rb', line 215

def done?
  @num && @messages.size >= @num
end

#identifierObject

get a short id in hex format, identifying ourself



335
336
337
# File 'lib/rsmp/collect/collector.rb', line 335

def identifier
  "Collect #{self.object_id.to_s(16)}"
end

#incompleteObject

called when we received a message, but are not done yet



228
229
230
# File 'lib/rsmp/collect/collector.rb', line 228

def incomplete
  log_incomplete
end

#ingoing?Boolean

Want ingoing messages?

Returns:

  • (Boolean)


88
89
90
# File 'lib/rsmp/collect/collector.rb', line 88

def ingoing?
  @ingoing == true
end

#inspectObject

Inspect formatter that shows the message we have collected



58
59
60
# File 'lib/rsmp/collect/collector.rb', line 58

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



276
277
278
# File 'lib/rsmp/collect/collector.rb', line 276

def keep message
  @messages << message
end

#log_completeObject

log when we end collecting



330
331
332
# File 'lib/rsmp/collect/collector.rb', line 330

def log_complete
  @distributor.log "#{identifier}: Done", level: :collect
end

#log_incompleteObject

log current progress



325
326
327
# File 'lib/rsmp/collect/collector.rb', line 325

def log_incomplete
  @distributor.log "#{identifier}: #{describe_progress}", level: :collect
end

#log_startObject

log when we start collecting



320
321
322
# File 'lib/rsmp/collect/collector.rb', line 320

def log_start
  @distributor.log "#{identifier}: Waiting for #{describe_matcher}".strip, level: :collect
end

#make_title(title) ⇒ Object



36
37
38
39
40
41
42
43
44
# File 'lib/rsmp/collect/collector.rb', line 36

def make_title title
  if title
    @title = title
  elsif @filter
    @title = [@filter.type].flatten.join('/')
  else
    @title = ""
  end
end

#ok!Object

if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector

Raises:



99
100
101
102
# File 'lib/rsmp/collect/collector.rb', line 99

def ok!
  raise @error if @error
  self
end

#ok?Boolean

Is collection complete?

Returns:

  • (Boolean)


68
69
70
# File 'lib/rsmp/collect/collector.rb', line 68

def ok?
  @status == :ok
end

#outgoing?Boolean

Want outgoing messages?

Returns:

  • (Boolean)


93
94
95
# File 'lib/rsmp/collect/collector.rb', line 93

def outgoing?
  @outgoing == true
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/rsmp/collect/collector.rb', line 201

def perform_match message
  return false if reject_not_ack(message)
  return false unless acceptable?(message)
  #@distributor.log "#{identifier}: Looking at #{message.type} #{message.m_id_short}", level: :collect
  if @block
    status = [@block.call(message)].flatten
    return unless collecting?
    keep message if status.include?(:keep)
  else
    keep message
  end
end

#ready?Boolean

Is collection ready to start?

Returns:

  • (Boolean)


78
79
80
# File 'lib/rsmp/collect/collector.rb', line 78

def ready?
  @status == :ready
end

#receive(message) ⇒ Object

Handle message. and return true when we’re done collecting

Raises:

  • (ArgumentError)


182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rsmp/collect/collector.rb', line 182

def receive message
  raise ArgumentError unless message
  unless ready? || collecting?
    raise RuntimeError.new("can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}") 
  end
  if perform_match message
    if done?
      complete
    else
      incomplete
    end
  end
  @status
end

#receive_disconnect(error, options) ⇒ Object

Cancel if we received e notificaiton about a disconnect



262
263
264
265
266
# File 'lib/rsmp/collect/collector.rb', line 262

def receive_disconnect error, options
  return unless @options.dig(:cancel,:disconnect)
  @distributor.log "#{identifier}: cancelled due to a connection error: #{error.to_s}", level: :debug
  cancel error
end

#receive_error(error, options = {}) ⇒ Object

An error occured upstream. Check if we should cancel.



241
242
243
244
245
246
247
248
# File 'lib/rsmp/collect/collector.rb', line 241

def receive_error error, options={}
  case error
  when RSMP::SchemaError
    receive_schema_error error, options
  when RSMP::DisconnectError
    receive_disconnect error, options
  end
end

#receive_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we’re collecting



251
252
253
254
255
256
257
258
259
# File 'lib/rsmp/collect/collector.rb', line 251

def receive_schema_error error, options
  return unless @options.dig(:cancel,:schema_error)
  message = options[:message]
  return unless message
  klass = message.class.name.split('::').last
  return unless @filter&.type == nil || [@filter&.type].flatten.include?(klass)
  @distributor.log "#{identifier}: cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#reject_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



169
170
171
172
173
174
175
176
177
178
179
# File 'lib/rsmp/collect/collector.rb', line 169

def reject_not_ack message
  return unless @m_id
  if message.is_a?(MessageNotAck)
    if message.attribute('oMId') == @m_id
      m_id_short = RSMP::Message.shorten_m_id @m_id, 8
      cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
      @distributor.log "#{identifier}: cancelled due to a NotAck", level: :debug
      true
    end
  end
end

#resetObject

Clear all matcher results



51
52
53
54
55
# File 'lib/rsmp/collect/collector.rb', line 51

def reset
  @messages = []
  @error = nil
  @status = :ready
end

#start(&block) ⇒ Object

Start collection and return immediately You can later use wait() to wait for completion

Raises:

  • (RuntimeError)


149
150
151
152
153
154
155
156
157
# File 'lib/rsmp/collect/collector.rb', line 149

def start &block
  raise RuntimeError.new("Can't start collectimng unless ready (currently #{@status})") unless ready?
  @block = block
  raise ArgumentError.new("Num, timeout or block must be provided") unless @num || @timeout || @block
  reset
  @status = :collecting
  log_start
  @distributor.add_receiver self if @distributor
end

#timeout?Boolean

Has collection timed out?

Returns:

  • (Boolean)


73
74
75
# File 'lib/rsmp/collect/collector.rb', line 73

def timeout?
  @status == :timeout
end

#use_task(task) ⇒ Object



46
47
48
# File 'lib/rsmp/collect/collector.rb', line 46

def use_task task
  @task = task
end

#waitObject

If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/rsmp/collect/collector.rb', line 124

def wait
  if collecting?
    if @timeout
      @task.with_timeout(@timeout) { @condition.wait }
    else
      @condition.wait
    end
  end
  @status
rescue Async::TimeoutError
  @error = RSMP::TimeoutError.new describe_progress
  @status = :timeout
end

#wait!Object

If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.

Raises:



141
142
143
144
145
# File 'lib/rsmp/collect/collector.rb', line 141

def wait!
  wait
  raise @error if timeout?
  @messages
end