Class: RSMP::Collector

Inherits:
Listener show all
Defined in:
lib/rsmp/collect/collector.rb

Overview

Collects messages from a notifier. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.

Direct Known Subclasses

AggregatedStatusCollector, StateCollector

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Listener

#change_notifier

Methods included from Inspect

#inspector

Constructor Details

#initialize(proxy, options = {}) ⇒ Collector

Returns a new instance of Collector.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rsmp/collect/collector.rb', line 9

def initialize proxy, options={}
  super proxy, options
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false,
    }
  }.deep_merge options
  @ingoing = options[:ingoing] == nil ? true  : options[:ingoing]
  @outgoing = options[:outgoing] == nil ? false : options[:outgoing]
  @condition = Async::Notification.new
  @title = options[:title] || [@options[:type]].flatten.join('/')
  @task = options[:task]
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def condition
  @condition
end

#errorObject (readonly)

Returns the value of attribute error.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def error
  @error
end

#messagesObject (readonly)

Returns the value of attribute messages.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def status
  @status
end

#taskObject (readonly)

Returns the value of attribute task.



7
8
9
# File 'lib/rsmp/collect/collector.rb', line 7

def task
  @task
end

Instance Method Details

#cancel(error = nil) ⇒ Object

Abort collection



225
226
227
228
229
# File 'lib/rsmp/collect/collector.rb', line 225

def cancel error=nil
  @error = error
  @status = :cancelled
  do_stop
end

#cancelled?Boolean

Has collection been cancelled?

Returns:

  • (Boolean)


62
63
64
# File 'lib/rsmp/collect/collector.rb', line 62

def cancelled?
  @status == :cancelled
end

#collect(&block) ⇒ Object

Collect message Will return once all messages have been collected, or timeout is reached



78
79
80
81
82
83
84
# File 'lib/rsmp/collect/collector.rb', line 78

def collect &block
  start &block
  wait
  @status
ensure
  @notifier.remove_listener self if @notifier
end

#collect!(&block) ⇒ Object

Collect message Returns the collected messages, or raise an exception in case of a time out.



88
89
90
91
92
93
# File 'lib/rsmp/collect/collector.rb', line 88

def collect! &block
  if collect(&block) == :timeout
    raise RSMP::TimeoutError.new describe_progress
  end
  @messages
end

#collecting?Boolean

Is collection active?

Returns:

  • (Boolean)


42
43
44
# File 'lib/rsmp/collect/collector.rb', line 42

def collecting?
  @status == :collecting
end

#completeObject

Called when we're done collecting. Remove ourself as a listener, se we don't receive message notifications anymore



183
184
185
186
# File 'lib/rsmp/collect/collector.rb', line 183

def complete
  @status = :ok
  do_stop
end

#describeObject



159
160
# File 'lib/rsmp/collect/collector.rb', line 159

def describe
end

#describe_progressObject

Build a string describing how how progress reached before timeout



131
132
133
134
135
136
137
# File 'lib/rsmp/collect/collector.rb', line 131

def describe_progress
  str = "#{@title.capitalize} collection "
  str << "in response to #{@options[:m_id]} " if @options[:m_id]
  str << "didn't complete within #{@options[:timeout]}s, "
  str << "reached #{@messages.size}/#{@options[:num]}"
  str
end

#do_stopObject

Remove ourself as a listener, so we don't receive message notifications anymore, and wake up the async condition



190
191
192
193
# File 'lib/rsmp/collect/collector.rb', line 190

def do_stop
  @notifier.remove_listener self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


177
178
179
# File 'lib/rsmp/collect/collector.rb', line 177

def done?
  @options[:num] && @messages.size >= @options[:num]
end

#ingoing?Boolean

Want ingoing messages?

Returns:

  • (Boolean)


67
68
69
# File 'lib/rsmp/collect/collector.rb', line 67

def ingoing?
  @ingoing == true
end

#inspectObject

Inspect formatter that shows the message we have collected



37
38
39
# File 'lib/rsmp/collect/collector.rb', line 37

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



232
233
234
# File 'lib/rsmp/collect/collector.rb', line 232

def keep message
  @messages << message
end

#notify(message) ⇒ Object

Handle message. and return true when we're done collecting

Raises:

  • (ArgumentError)


152
153
154
155
156
157
# File 'lib/rsmp/collect/collector.rb', line 152

def notify message
  raise ArgumentError unless message
  raise RuntimeError.new("can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}") unless ready? || collecting?
  perform_match message
  @status
end

#notify_disconnect(error, options) ⇒ Object

Cancel if we received e notificaiton about a disconnect



218
219
220
221
222
# File 'lib/rsmp/collect/collector.rb', line 218

def notify_disconnect error, options
  return unless @options.dig(:cancel,:disconnect)
  @notifier.log "Collection cancelled due to a connection error: #{error.to_s}", level: :debug
  cancel error
end

#notify_error(error, options = {}) ⇒ Object

The proxy experienced some error. Check if this should cause us to cancel.



197
198
199
200
201
202
203
204
# File 'lib/rsmp/collect/collector.rb', line 197

def notify_error error, options={}
  case error
  when RSMP::SchemaError
    notify_schema_error error, options
  when RSMP::DisconnectError
    notify_disconnect error, options
  end
end

#notify_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we're collecting



207
208
209
210
211
212
213
214
215
# File 'lib/rsmp/collect/collector.rb', line 207

def notify_schema_error error, options
  return unless @options.dig(:cancel,:schema_error)
  message = options[:message]
  return unless message
  klass = message.class.name.split('::').last
  return unless @options[:type] == nil || [@options[:type]].flatten.include?(klass)
  @notifier.log "Collection cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#ok?Boolean

Is collection active?

Returns:

  • (Boolean)


47
48
49
# File 'lib/rsmp/collect/collector.rb', line 47

def ok?
  @status == :ok
end

#outgoing?Boolean

Want outgoing messages?

Returns:

  • (Boolean)


72
73
74
# File 'lib/rsmp/collect/collector.rb', line 72

def outgoing?
  @outgoing == true
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/rsmp/collect/collector.rb', line 163

def perform_match message
  return false if reject_not_ack(message)
  return false unless type_match?(message)
  if @block
    status = [@block.call(message)].flatten
    return unless collecting?
    keep message if status.include?(:keep)
  else
    keep message
  end
  complete if done?
end

#ready?Boolean

Is collection ready to start?

Returns:

  • (Boolean)


57
58
59
# File 'lib/rsmp/collect/collector.rb', line 57

def ready?
  @status == :ready
end

#reject_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



140
141
142
143
144
145
146
147
148
149
# File 'lib/rsmp/collect/collector.rb', line 140

def reject_not_ack message
  return unless @options[:m_id]
  if message.is_a?(MessageNotAck)
    if message.attribute('oMId') == @options[:m_id]
      m_id_short = RSMP::Message.shorten_m_id @options[:m_id], 8
      cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
      true
    end
  end
end

#resetObject

Clear all query results



30
31
32
33
34
# File 'lib/rsmp/collect/collector.rb', line 30

def reset
  @messages = []
  @error = nil
  @status = :ready
end

#start(&block) ⇒ Object

Start collection and return immediately You can later use wait() to wait for completion

Raises:

  • (RuntimeError)


121
122
123
124
125
126
127
128
# File 'lib/rsmp/collect/collector.rb', line 121

def start &block
  raise RuntimeError.new("Can't begin unless ready (currenty #{@status})") unless ready?
  @block = block
  raise ArgumentError.new("Num, timeout or block must be provided") unless @options[:num] || @options[:timeout] || @block
  reset
  @status = :collecting
  @notifier.add_listener self if @notifier
end

#timeout?Boolean

Has collection time out?

Returns:

  • (Boolean)


52
53
54
# File 'lib/rsmp/collect/collector.rb', line 52

def timeout?
  @status == :timeout
end

#type_match?(message) ⇒ Boolean

Check a message against our match criteria Return true if there's a match, false if not

Returns:

  • (Boolean)


238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/rsmp/collect/collector.rb', line 238

def type_match? message
  return false if message.direction == :in && @ingoing == false
  return false if message.direction == :out && @outgoing == false
  if @options[:type]
    if @options[:type].is_a? Array
      return false unless @options[:type].include? message.type
    else
      return false unless message.type == @options[:type]
    end
  end
  if @options[:component]
    return false if message.attributes['cId'] && message.attributes['cId'] != @options[:component]
  end
  true
end

#use_task(task) ⇒ Object



25
26
27
# File 'lib/rsmp/collect/collector.rb', line 25

def use_task task
  @task = task
end

#waitObject

If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/rsmp/collect/collector.rb', line 97

def wait
  if collecting?
    if @options[:timeout]
      @task.with_timeout(@options[:timeout]) { @condition.wait }
    else
      @condition.wait
    end
  end
  @status
rescue Async::TimeoutError
  @status = :timeout
end

#wait!Object

If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.

Raises:



113
114
115
116
117
# File 'lib/rsmp/collect/collector.rb', line 113

def wait!
  wait
  raise RSMP::TimeoutError.new(describe_progress) if timeout?
  @messages
end