Class: RSMP::MessageCollector

Inherits:
Collector show all
Defined in:
lib/rsmp/collect/message_collector.rb

Overview

Collects ingoing and/or outgoing messages from a notifier. Can filter by message type and wakes up the client once the desired number of messages has been collected.

Instance Attribute Summary collapse

Attributes inherited from Collector

#error, #status

Instance Method Summary collapse

Methods inherited from Collector

#collect!, #describe_progress, #do_stop, #reject_not_ack, #start, #wait!

Methods inherited from Listener

#change_notifier, #listen

Methods included from Inspect

#inspector

Constructor Details

#initialize(proxy, options = {}) ⇒ MessageCollector

Returns a new instance of MessageCollector.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rsmp/collect/message_collector.rb', line 8

def initialize proxy, options={}
  super proxy, options
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false,
    }
  }.deep_merge options
  @ingoing = options[:ingoing] == nil ? true  : options[:ingoing]
  @outgoing = options[:outgoing] == nil ? false : options[:outgoing]
  @condition = Async::Notification.new
  @title = options[:title] || [@options[:type]].flatten.join('/')
  @options[:timeout] ||= 1
  @options[:num] ||= 1
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



6
7
8
# File 'lib/rsmp/collect/message_collector.rb', line 6

def condition
  @condition
end

#doneObject (readonly)

Returns the value of attribute done.



6
7
8
# File 'lib/rsmp/collect/message_collector.rb', line 6

def done
  @done
end

#messagesObject (readonly)

Get the collected messages.



82
83
84
# File 'lib/rsmp/collect/message_collector.rb', line 82

def messages
  @messages
end

Instance Method Details

#cancel(error) ⇒ Object

Abort collection



170
171
172
173
174
175
# File 'lib/rsmp/collect/message_collector.rb', line 170

def cancel error
  @error = error if error
  @done = false
  @proxy.remove_listener self
  @condition.signal
end

#check_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rsmp/collect/message_collector.rb', line 94

def check_not_ack message
  return unless @options[:m_id]
  if message.is_a?(MessageNotAck)
    if message.attribute('oMId') == @options[:m_id]
      m_id_short = RSMP::Message.shorten_m_id @options[:m_id], 8
      @error = RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
      complete
    end
    false
  end
end

#collect(task, options = {}, &block) ⇒ Object

Collect message Will block until all messages have been collected, or we time out



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rsmp/collect/message_collector.rb', line 48

def collect task, options={}, &block
  @options.merge! options
  @block = block
  unless @done
    listen do
      task.with_timeout(@options[:timeout]) do
        @condition.wait
      end
    end
  end
  return @error if @error
  self
rescue Async::TimeoutError
  str = "#{@title.capitalize} collection"
  str << " in response to #{options[:m_id]}" if options[:m_id]
  str << " didn't complete within #{@options[:timeout]}s"
  reached = progress
  str << ", reached #{progress[:reached]}/#{progress[:need]}"
  raise RSMP::TimeoutError.new str
end

#completeObject

Called when we're done collecting. Remove ourself as a listener, se we don't receive message notifications anymore



134
135
136
137
138
# File 'lib/rsmp/collect/message_collector.rb', line 134

def complete
  @done = true
  @proxy.remove_listener self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


128
129
130
# File 'lib/rsmp/collect/message_collector.rb', line 128

def done?
  @options[:num] && @messages.size >= @options[:num]
end

#forget(message) ⇒ Object

Remove a message from the result array



183
184
185
# File 'lib/rsmp/collect/message_collector.rb', line 183

def forget message
  @messages.delete message
end

#ingoing?Boolean

Want ingoing messages?

Returns:

  • (Boolean)


31
32
33
# File 'lib/rsmp/collect/message_collector.rb', line 31

def ingoing?
  @ingoing == true
end

#inspectObject

Inspect formatter that shows the message we have collected



26
27
28
# File 'lib/rsmp/collect/message_collector.rb', line 26

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



178
179
180
# File 'lib/rsmp/collect/message_collector.rb', line 178

def keep message
  @messages << message
end

#messageObject

Get the collected message.



77
78
79
# File 'lib/rsmp/collect/message_collector.rb', line 77

def message
  @messages.first
end

#notify(message) ⇒ Object

Handle message. and return true when we're done collecting

Raises:

  • (ArgumentError)


107
108
109
110
111
112
113
114
115
# File 'lib/rsmp/collect/message_collector.rb', line 107

def notify message
  raise ArgumentError unless message
  raise RuntimeError.new("can't process message when already done") if @done
  check_not_ack(message)
  return true if @done
  perform_match message
  complete if done?
  @done
end

#notify_disconnect(error, options) ⇒ Object

Cancel if we received e notificaiton about a disconnect



163
164
165
166
167
# File 'lib/rsmp/collect/message_collector.rb', line 163

def notify_disconnect error, options
  return unless @options.dig(:cancel,:disconnect)
  @proxy.log "Collect cancelled due to a connection error: #{error.to_s}", level: :debug
  cancel error
end

#notify_error(error, options = {}) ⇒ Object

The proxy experienced some error. Check if this should cause us to cancel.



142
143
144
145
146
147
148
149
# File 'lib/rsmp/collect/message_collector.rb', line 142

def notify_error error, options={}
  case error
  when RSMP::SchemaError
    notify_schema_error error, options
  when RSMP::ConnectionError
    notify_disconnect error, options
  end
end

#notify_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we're collecting



152
153
154
155
156
157
158
159
160
# File 'lib/rsmp/collect/message_collector.rb', line 152

def notify_schema_error error, options
  return unless @options.dig(:cancel,:schema_error)
  message = options[:message]
  return unless message
  klass = message.class.name.split('::').last
  return unless [@options[:type]].flatten.include? klass
  @proxy.log "Collect cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#outgoing?Boolean

Want outgoing messages?

Returns:

  • (Boolean)


36
37
38
# File 'lib/rsmp/collect/message_collector.rb', line 36

def outgoing?
  @outgoing == true
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



118
119
120
121
122
123
124
125
# File 'lib/rsmp/collect/message_collector.rb', line 118

def perform_match message
  matched = type_match?(message) && block_match?(message)
  if matched == true
    keep message
  elsif matched == false
    forget message
  end
end

#progressObject

Return progress as collected vs. number requested



70
71
72
73
74
# File 'lib/rsmp/collect/message_collector.rb', line 70

def progress
  need = @options[:num]
  reached =  @messages.size
  { need: need, got: reached }
end

#resetObject

Clear all query results



87
88
89
90
91
# File 'lib/rsmp/collect/message_collector.rb', line 87

def reset
  @messages = []
  @error = nil
  @done = false
end

#type_match?(message) ⇒ Boolean

Check a message against our match criteria Return true if there's a match, false if not

Returns:

  • (Boolean)


189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rsmp/collect/message_collector.rb', line 189

def type_match? message
  return false if message.direction == :in && @ingoing == false
  return false if message.direction == :out && @outgoing == false
  if @options[:type]
    if @options[:type].is_a? Array
      return false unless @options[:type].include? message.type
    else
      return false unless message.type == @options[:type]
    end
  end
  if @options[:component]
    return false if message.attributes['cId'] && message.attributes['cId'] != @options[:component]
  end
  true
end

#waitObject

Block until all messages have been collected



41
42
43
# File 'lib/rsmp/collect/message_collector.rb', line 41

def wait
  @condition.wait
end