Class: RSMP::Collector

Inherits:
Listener show all
Defined in:
lib/rsmp/collect/collector.rb

Overview

Collects ingoing and/or outgoing messages from a notifier. Can filter by message type and wakes up the client once the desired number of messages has been collected.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Listener

#change_notifier, #listen

Methods included from Inspect

#inspector

Constructor Details

#initialize(proxy, options = {}) ⇒ Collector

Returns a new instance of Collector.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/rsmp/collect/collector.rb', line 8

def initialize proxy, options={}
  super proxy, options
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false,
    }
  }.deep_merge options
  @ingoing = options[:ingoing] == nil ? true  : options[:ingoing]
  @outgoing = options[:outgoing] == nil ? false : options[:outgoing]
  @condition = Async::Notification.new
  @title = options[:title] || [@options[:type]].flatten.join('/')
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



6
7
8
# File 'lib/rsmp/collect/collector.rb', line 6

def condition
  @condition
end

#errorObject (readonly)

Returns the value of attribute error.



6
7
8
# File 'lib/rsmp/collect/collector.rb', line 6

def error
  @error
end

#messagesObject (readonly)

Returns the value of attribute messages.



6
7
8
# File 'lib/rsmp/collect/collector.rb', line 6

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



6
7
8
# File 'lib/rsmp/collect/collector.rb', line 6

def status
  @status
end

Instance Method Details

#cancel(error) ⇒ Object

Abort collection



200
201
202
203
204
# File 'lib/rsmp/collect/collector.rb', line 200

def cancel error
  @error = error
  @status = :cancelled
  do_stop
end

#collect(task, options = {}, &block) ⇒ Object

Collect message Will return once all messages have been collected, or timeout is reached



84
85
86
87
88
89
90
# File 'lib/rsmp/collect/collector.rb', line 84

def collect task, options={}, &block
  start options, &block
  wait task
  @status
ensure
  @notifier.remove_listener self
end

#collect!(task, options = {}, &block) ⇒ Object

Collect message Returns the collected messages, or raise an exception in case of a time out.



103
104
105
106
107
108
109
110
# File 'lib/rsmp/collect/collector.rb', line 103

def collect! task, options={}, &block
  case collect(task, options, &block)
  when :timeout
    raise RSMP::TimeoutError.new @why
  else
    @messages
  end
end

#completeObject

Called when we're done collecting. Remove ourself as a listener, se we don't receive message notifications anymore



158
159
160
161
# File 'lib/rsmp/collect/collector.rb', line 158

def complete
  @status = :ok
  do_stop
end

#describe_progressObject

Build a string describing how how progress reached before timeout



93
94
95
96
97
98
99
# File 'lib/rsmp/collect/collector.rb', line 93

def describe_progress
  str = "#{@title.capitalize} collection "
  str << "in response to #{@options[:m_id]} " if @options[:m_id]
  str << "didn't complete within #{@options[:timeout]}s, "
  str << "reached #{@messages.size}/#{@options[:num]}"
  str
end

#do_stopObject

Remove ourself as a listener, so we don't receive message notifications anymore, and wake up the async condition



165
166
167
168
# File 'lib/rsmp/collect/collector.rb', line 165

def do_stop
  @notifier.remove_listener self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


152
153
154
# File 'lib/rsmp/collect/collector.rb', line 152

def done?
  @options[:num] && @messages.size >= @options[:num]
end

#ingoing?Boolean

Want ingoing messages?

Returns:

  • (Boolean)


37
38
39
# File 'lib/rsmp/collect/collector.rb', line 37

def ingoing?
  @ingoing == true
end

#inspectObject

Inspect formatter that shows the message we have collected



32
33
34
# File 'lib/rsmp/collect/collector.rb', line 32

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



207
208
209
# File 'lib/rsmp/collect/collector.rb', line 207

def keep message
  @messages << message
end

#notify(message) ⇒ Object

Handle message. and return true when we're done collecting

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
132
# File 'lib/rsmp/collect/collector.rb', line 125

def notify message
  raise ArgumentError unless message
  raise RuntimeError.new("can't process message when done") unless @status == :ready || @status == :collecting
  unless reject_not_ack(message)
    perform_match message
  end
  @status
end

#notify_disconnect(error, options) ⇒ Object

Cancel if we received e notificaiton about a disconnect



193
194
195
196
197
# File 'lib/rsmp/collect/collector.rb', line 193

def notify_disconnect error, options
  return unless @options.dig(:cancel,:disconnect)
  @notifier.log "Collection cancelled due to a connection error: #{error.to_s}", level: :debug
  cancel error
end

#notify_error(error, options = {}) ⇒ Object

The proxy experienced some error. Check if this should cause us to cancel.



172
173
174
175
176
177
178
179
# File 'lib/rsmp/collect/collector.rb', line 172

def notify_error error, options={}
  case error
  when RSMP::SchemaError
    notify_schema_error error, options
  when RSMP::DisconnectError
    notify_disconnect error, options
  end
end

#notify_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we're collecting



182
183
184
185
186
187
188
189
190
# File 'lib/rsmp/collect/collector.rb', line 182

def notify_schema_error error, options
  return unless @options.dig(:cancel,:schema_error)
  message = options[:message]
  return unless message
  klass = message.class.name.split('::').last
  return unless @options[:type] == nil || [@options[:type]].flatten.include?(klass)
  @notifier.log "Collection cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#outgoing?Boolean

Want outgoing messages?

Returns:

  • (Boolean)


42
43
44
# File 'lib/rsmp/collect/collector.rb', line 42

def outgoing?
  @outgoing == true
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/rsmp/collect/collector.rb', line 135

def perform_match message
  return unless type_match?(message)
  if @block
    status = [@block.call(message)].flatten
    keep message if status.include?(:keep)
    if status.include?(:cancel)
      cancel('Cancelled by block')
    else
      complete if done?
    end
  else
    keep message
    complete if done?
  end
end

#reject_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



113
114
115
116
117
118
119
120
121
122
# File 'lib/rsmp/collect/collector.rb', line 113

def reject_not_ack message
  return unless @options[:m_id]
  if message.is_a?(MessageNotAck)
    if message.attribute('oMId') == @options[:m_id]
      m_id_short = RSMP::Message.shorten_m_id @options[:m_id], 8
      cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
      true
    end
  end
end

#resetObject

Clear all query results



24
25
26
27
28
29
# File 'lib/rsmp/collect/collector.rb', line 24

def reset
  @messages = []
  @error = nil
  @status = :ready
  @why = nil
end

#start(options = {}, &block) ⇒ Object

Start collection and return immediately You can later use wait() to wait for completion

Raises:

  • (RuntimeError)


72
73
74
75
76
77
78
79
80
# File 'lib/rsmp/collect/collector.rb', line 72

def start options={}, &block
  raise RuntimeError.new("Can't begin unless ready (currenty #{@status})") unless @status == :ready
  @options.merge! options
  @block = block
  raise ArgumentError.new("Num, timeout or block must be provided") unless @options[:num] || @options[:timeout] || @block
  reset
  @status = :collecting
  @notifier.add_listener self if @notifier
end

#type_match?(message) ⇒ Boolean

Check a message against our match criteria Return true if there's a match, false if not

Returns:

  • (Boolean)


213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/rsmp/collect/collector.rb', line 213

def type_match? message
  return false if message.direction == :in && @ingoing == false
  return false if message.direction == :out && @outgoing == false
  if @options[:type]
    if @options[:type].is_a? Array
      return false unless @options[:type].include? message.type
    else
      return false unless message.type == @options[:type]
    end
  end
  if @options[:component]
    return false if message.attributes['cId'] && message.attributes['cId'] != @options[:component]
  end
  true
end

#wait(task) ⇒ Object

If collection is complete, return immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.



48
49
50
51
52
# File 'lib/rsmp/collect/collector.rb', line 48

def wait task
  wait! task
rescue RSMP::TimeoutError
  @status
end

#wait!(task) ⇒ Object

If collection is complete, return immeditatly. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/rsmp/collect/collector.rb', line 57

def wait! task
  return @status unless @status == :collecting
  if @options[:timeout]
    task.with_timeout(@options[:timeout]) { @condition.wait }
  else
    @condition.wait
  end
  @status
rescue Async::TimeoutError
  @status = :timeout
  raise RSMP::TimeoutError.new(describe_progress)
end