Class: RSMP::MessageCollector
- Defined in:
- lib/rsmp/collect/message_collector.rb
Overview
Collects ingoing and/or outgoing messages from a notifier. Can filter by message type and wakes up the client once the desired number of messages has been collected.
Instance Attribute Summary collapse
-
#condition ⇒ Object
readonly
Returns the value of attribute condition.
-
#done ⇒ Object
readonly
Returns the value of attribute done.
-
#messages ⇒ Object
readonly
Get the collected messages.
Attributes inherited from Collector
Instance Method Summary collapse
-
#cancel(error) ⇒ Object
Abort collection.
-
#check_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
-
#collect(task, options = {}, &block) ⇒ Object
Collect message Will block until all messages have been collected, or we time out.
-
#complete ⇒ Object
Called when we're done collecting.
-
#done? ⇒ Boolean
Have we collected the required number of messages?.
-
#forget(message) ⇒ Object
Remove a message from the result array.
-
#ingoing? ⇒ Boolean
Want ingoing messages?.
-
#initialize(proxy, options = {}) ⇒ MessageCollector
constructor
A new instance of MessageCollector.
-
#inspect ⇒ Object
Inspect formatter that shows the message we have collected.
-
#keep(message) ⇒ Object
Store a message in the result array.
-
#message ⇒ Object
Get the collected message.
-
#notify(message) ⇒ Object
Handle message.
-
#notify_disconnect(error, options) ⇒ Object
Cancel if we received e notificaiton about a disconnect.
-
#notify_error(error, options = {}) ⇒ Object
The proxy experienced some error.
-
#notify_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we're collecting.
-
#outgoing? ⇒ Boolean
Want outgoing messages?.
-
#perform_match(message) ⇒ Object
Match message against our collection criteria.
-
#progress ⇒ Object
Return progress as collected vs.
-
#reset ⇒ Object
Clear all query results.
-
#type_match?(message) ⇒ Boolean
Check a message against our match criteria Return true if there's a match, false if not.
-
#wait ⇒ Object
Block until all messages have been collected.
Methods inherited from Collector
#collect!, #describe_progress, #do_stop, #reject_not_ack, #start, #wait!
Methods inherited from Listener
Methods included from Inspect
Constructor Details
#initialize(proxy, options = {}) ⇒ MessageCollector
Returns a new instance of MessageCollector.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rsmp/collect/message_collector.rb', line 8 def initialize proxy, ={} super proxy, @options = { cancel: { schema_error: true, disconnect: false, } }.deep_merge @ingoing = [:ingoing] == nil ? true : [:ingoing] @outgoing = [:outgoing] == nil ? false : [:outgoing] @condition = Async::Notification.new @title = [:title] || [@options[:type]].flatten.join('/') @options[:timeout] ||= 1 @options[:num] ||= 1 reset end |
Instance Attribute Details
#condition ⇒ Object (readonly)
Returns the value of attribute condition.
6 7 8 |
# File 'lib/rsmp/collect/message_collector.rb', line 6 def condition @condition end |
#done ⇒ Object (readonly)
Returns the value of attribute done.
6 7 8 |
# File 'lib/rsmp/collect/message_collector.rb', line 6 def done @done end |
#messages ⇒ Object (readonly)
Get the collected messages.
82 83 84 |
# File 'lib/rsmp/collect/message_collector.rb', line 82 def @messages end |
Instance Method Details
#cancel(error) ⇒ Object
Abort collection
170 171 172 173 174 175 |
# File 'lib/rsmp/collect/message_collector.rb', line 170 def cancel error @error = error if error @done = false @proxy.remove_listener self @condition.signal end |
#check_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rsmp/collect/message_collector.rb', line 94 def check_not_ack return unless @options[:m_id] if .is_a?(MessageNotAck) if .attribute('oMId') == @options[:m_id] m_id_short = RSMP::Message.shorten_m_id @options[:m_id], 8 @error = RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{.attribute('rea')}'") complete end false end end |
#collect(task, options = {}, &block) ⇒ Object
Collect message Will block until all messages have been collected, or we time out
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rsmp/collect/message_collector.rb', line 48 def collect task, ={}, &block @options.merge! @block = block unless @done listen do task.with_timeout(@options[:timeout]) do @condition.wait end end end return @error if @error self rescue Async::TimeoutError str = "#{@title.capitalize} collection" str << " in response to #{[:m_id]}" if [:m_id] str << " didn't complete within #{@options[:timeout]}s" reached = progress str << ", reached #{progress[:reached]}/#{progress[:need]}" raise RSMP::TimeoutError.new str end |
#complete ⇒ Object
Called when we're done collecting. Remove ourself as a listener, se we don't receive message notifications anymore
134 135 136 137 138 |
# File 'lib/rsmp/collect/message_collector.rb', line 134 def complete @done = true @proxy.remove_listener self @condition.signal end |
#done? ⇒ Boolean
Have we collected the required number of messages?
128 129 130 |
# File 'lib/rsmp/collect/message_collector.rb', line 128 def done? @options[:num] && @messages.size >= @options[:num] end |
#forget(message) ⇒ Object
Remove a message from the result array
183 184 185 |
# File 'lib/rsmp/collect/message_collector.rb', line 183 def forget @messages.delete end |
#ingoing? ⇒ Boolean
Want ingoing messages?
31 32 33 |
# File 'lib/rsmp/collect/message_collector.rb', line 31 def ingoing? @ingoing == true end |
#inspect ⇒ Object
Inspect formatter that shows the message we have collected
26 27 28 |
# File 'lib/rsmp/collect/message_collector.rb', line 26 def inspect "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>" end |
#keep(message) ⇒ Object
Store a message in the result array
178 179 180 |
# File 'lib/rsmp/collect/message_collector.rb', line 178 def keep @messages << end |
#message ⇒ Object
Get the collected message.
77 78 79 |
# File 'lib/rsmp/collect/message_collector.rb', line 77 def @messages.first end |
#notify(message) ⇒ Object
Handle message. and return true when we're done collecting
107 108 109 110 111 112 113 114 115 |
# File 'lib/rsmp/collect/message_collector.rb', line 107 def notify raise ArgumentError unless raise RuntimeError.new("can't process message when already done") if @done check_not_ack() return true if @done perform_match complete if done? @done end |
#notify_disconnect(error, options) ⇒ Object
Cancel if we received e notificaiton about a disconnect
163 164 165 166 167 |
# File 'lib/rsmp/collect/message_collector.rb', line 163 def notify_disconnect error, return unless @options.dig(:cancel,:disconnect) @proxy.log "Collect cancelled due to a connection error: #{error.to_s}", level: :debug cancel error end |
#notify_error(error, options = {}) ⇒ Object
The proxy experienced some error. Check if this should cause us to cancel.
142 143 144 145 146 147 148 149 |
# File 'lib/rsmp/collect/message_collector.rb', line 142 def notify_error error, ={} case error when RSMP::SchemaError notify_schema_error error, when RSMP::ConnectionError notify_disconnect error, end end |
#notify_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we're collecting
152 153 154 155 156 157 158 159 160 |
# File 'lib/rsmp/collect/message_collector.rb', line 152 def notify_schema_error error, return unless @options.dig(:cancel,:schema_error) = [:message] return unless klass = .class.name.split('::').last return unless [@options[:type]].flatten.include? klass @proxy.log "Collect cancelled due to schema error in #{klass} #{.m_id_short}", level: :debug cancel error end |
#outgoing? ⇒ Boolean
Want outgoing messages?
36 37 38 |
# File 'lib/rsmp/collect/message_collector.rb', line 36 def outgoing? @outgoing == true end |
#perform_match(message) ⇒ Object
Match message against our collection criteria
118 119 120 121 122 123 124 125 |
# File 'lib/rsmp/collect/message_collector.rb', line 118 def perform_match matched = type_match?() && block_match?() if matched == true keep elsif matched == false forget end end |
#progress ⇒ Object
Return progress as collected vs. number requested
70 71 72 73 74 |
# File 'lib/rsmp/collect/message_collector.rb', line 70 def progress need = @options[:num] reached = @messages.size { need: need, got: reached } end |
#reset ⇒ Object
Clear all query results
87 88 89 90 91 |
# File 'lib/rsmp/collect/message_collector.rb', line 87 def reset @messages = [] @error = nil @done = false end |
#type_match?(message) ⇒ Boolean
Check a message against our match criteria Return true if there's a match, false if not
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/rsmp/collect/message_collector.rb', line 189 def type_match? return false if .direction == :in && @ingoing == false return false if .direction == :out && @outgoing == false if @options[:type] if @options[:type].is_a? Array return false unless @options[:type].include? .type else return false unless .type == @options[:type] end end if @options[:component] return false if .attributes['cId'] && .attributes['cId'] != @options[:component] end true end |
#wait ⇒ Object
Block until all messages have been collected
41 42 43 |
# File 'lib/rsmp/collect/message_collector.rb', line 41 def wait @condition.wait end |