Class: RSMP::Collector
- Inherits:
-
Object
- Object
- RSMP::Collector
- Defined in:
- lib/rsmp/collect/collector.rb,
lib/rsmp/collect/collector/status.rb,
lib/rsmp/collect/collector/logging.rb,
lib/rsmp/collect/collector/reporting.rb
Overview
Collects messages from a distributor. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.
Direct Known Subclasses
AckCollector, AggregatedStatusCollector, AlarmCollector, StateCollector
Defined Under Namespace
Modules: Logging, Reporting, Status
Instance Attribute Summary collapse
-
#condition ⇒ Object
readonly
Returns the value of attribute condition.
-
#error ⇒ Object
readonly
Returns the value of attribute error.
-
#initiator ⇒ Object
readonly
Returns the value of attribute initiator.
-
#m_id ⇒ Object
readonly
Returns the value of attribute m_id.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
Attributes included from Logging
Instance Method Summary collapse
-
#acceptable?(message) ⇒ Boolean
Check a message against our match criteria Return true if there’s a match, false if not.
-
#cancel(error = nil) ⇒ Object
Abort collection.
-
#collect ⇒ Object
Collect message Will return once all messages have been collected, or timeout is reached.
-
#collect! ⇒ Object
Collect message Returns the collected messages, or raise an exception in case of a time out.
-
#complete ⇒ Object
Called when we’re done collecting.
- #describe ⇒ Object
-
#do_stop ⇒ Object
Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition.
-
#done? ⇒ Boolean
Have we collected the required number of messages?.
-
#incomplete ⇒ Object
called when we received a message, but are not done yet.
-
#initialize(distributor, options = {}) ⇒ Collector
constructor
A new instance of Collector.
-
#inspect ⇒ Object
Inspect formatter that shows the message we have collected.
-
#keep(message) ⇒ Object
Store a message in the result array.
- #make_title(title) ⇒ Object
-
#ok! ⇒ Object
if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector.
-
#perform_match(message) ⇒ Object
Match message against our collection criteria.
-
#receive(message) ⇒ Object
Handle message and return true if we’re done collecting.
-
#receive_disconnect(error, _options) ⇒ Object
Cancel if we received e notifiction about a disconnect.
-
#receive_error(error, options = {}) ⇒ Object
Handle upstream error.
-
#receive_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we’re collecting.
-
#reject_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
- #reset ⇒ Object
-
#start(&block) ⇒ Object
Start collection and return immediately You can later use wait() to wait for completion.
- #use_task(task) ⇒ Object
-
#wait ⇒ Object
If collection is not active, return status immeditatly.
-
#wait! ⇒ Object
If collection is not active, raise an error.
Methods included from Logging
#author, #initialize_logging, #log
Methods included from Reporting
#describe_matcher, #describe_num_and_type, #describe_progress, #describe_types, #identifier
Methods included from Status
#cancelled?, #collecting?, #ingoing?, #ok?, #outgoing?, #ready?, #timeout?
Methods included from Receiver
#accept_message?, #handle_message, #initialize_receiver, #reject_message?, #start_receiving, #stop_receiving
Methods included from Inspect
Constructor Details
#initialize(distributor, options = {}) ⇒ Collector
Returns a new instance of Collector.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rsmp/collect/collector.rb', line 13 def initialize(distributor, = {}) initialize_receiver distributor, filter: [:filter] @options = { cancel: { schema_error: true, disconnect: false } }.deep_merge @timeout = [:timeout] @num = [:num] @initiator = [:initiator] @m_id = [:m_id] || @initiator&.attributes&.dig('mId') @condition = Async::Notification.new make_title [:title] if task @task = task elsif distributor.respond_to? 'task' # if distributor is a Proxy, or some other object that implements task(), # then try to get the task that way @task = distributor.task end reset end |
Instance Attribute Details
#condition ⇒ Object (readonly)
Returns the value of attribute condition.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def condition @condition end |
#error ⇒ Object (readonly)
Returns the value of attribute error.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def error @error end |
#initiator ⇒ Object (readonly)
Returns the value of attribute initiator.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def initiator @initiator end |
#m_id ⇒ Object (readonly)
Returns the value of attribute m_id.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def m_id @m_id end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def @messages end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def status @status end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def task @task end |
Instance Method Details
#acceptable?(message) ⇒ Boolean
Check a message against our match criteria Return true if there’s a match, false if not
247 248 249 |
# File 'lib/rsmp/collect/collector.rb', line 247 def acceptable?() @filter.nil? || @filter.accept?() end |
#cancel(error = nil) ⇒ Object
Abort collection
234 235 236 237 238 |
# File 'lib/rsmp/collect/collector.rb', line 234 def cancel(error = nil) @error = error @status = :cancelled do_stop end |
#collect ⇒ Object
Collect message Will return once all messages have been collected, or timeout is reached
73 74 75 76 77 78 79 |
# File 'lib/rsmp/collect/collector.rb', line 73 def collect(&) start(&) wait @status ensure @distributor&.remove_receiver self end |
#collect! ⇒ Object
Collect message Returns the collected messages, or raise an exception in case of a time out.
83 84 85 86 87 |
# File 'lib/rsmp/collect/collector.rb', line 83 def collect!(&) collect(&) ok! @messages end |
#complete ⇒ Object
Called when we’re done collecting. Remove ourself as a receiver, se we don’t receive message notifications anymore
183 184 185 186 187 |
# File 'lib/rsmp/collect/collector.rb', line 183 def complete @status = :ok do_stop log_complete end |
#describe ⇒ Object
159 |
# File 'lib/rsmp/collect/collector.rb', line 159 def describe; end |
#do_stop ⇒ Object
Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition
196 197 198 199 |
# File 'lib/rsmp/collect/collector.rb', line 196 def do_stop @distributor.remove_receiver self @condition.signal end |
#done? ⇒ Boolean
Have we collected the required number of messages?
177 178 179 |
# File 'lib/rsmp/collect/collector.rb', line 177 def done? @num && @messages.size >= @num end |
#incomplete ⇒ Object
called when we received a message, but are not done yet
190 191 192 |
# File 'lib/rsmp/collect/collector.rb', line 190 def incomplete log_incomplete end |
#inspect ⇒ Object
Inspect formatter that shows the message we have collected
59 60 61 |
# File 'lib/rsmp/collect/collector.rb', line 59 def inspect "#<#{self.class.name}:#{object_id}, #{inspector(:@messages)}>" end |
#keep(message) ⇒ Object
Store a message in the result array
241 242 243 |
# File 'lib/rsmp/collect/collector.rb', line 241 def keep() @messages << end |
#make_title(title) ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/rsmp/collect/collector.rb', line 38 def make_title(title) @title = if title title elsif @filter [@filter.type].flatten.join('/') else '' end end |
#ok! ⇒ Object
if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector
65 66 67 68 69 |
# File 'lib/rsmp/collect/collector.rb', line 65 def ok! raise @error if @error self end |
#perform_match(message) ⇒ Object
Match message against our collection criteria
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/rsmp/collect/collector.rb', line 162 def perform_match() return false if reject_not_ack() return false unless acceptable?() if @block status = [@block.call()].flatten return unless collecting? keep if status.include?(:keep) else keep end end |
#receive(message) ⇒ Object
Handle message and return true if we’re done collecting
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/rsmp/collect/collector.rb', line 143 def receive() raise ArgumentError unless unless ready? || collecting? raise "can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}" end if perform_match if done? complete else incomplete end end @status end |
#receive_disconnect(error, _options) ⇒ Object
Cancel if we received e notifiction about a disconnect
226 227 228 229 230 231 |
# File 'lib/rsmp/collect/collector.rb', line 226 def receive_disconnect(error, ) return unless @options.dig(:cancel, :disconnect) @distributor.log "#{identifier}: cancelled due to a connection error: #{error}", level: :debug cancel error end |
#receive_error(error, options = {}) ⇒ Object
Handle upstream error
202 203 204 205 206 207 208 209 |
# File 'lib/rsmp/collect/collector.rb', line 202 def receive_error(error, = {}) case error when RSMP::SchemaError receive_schema_error error, when RSMP::DisconnectError receive_disconnect error, end end |
#receive_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we’re collecting
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/rsmp/collect/collector.rb', line 212 def receive_schema_error(error, ) return unless @options.dig(:cancel, :schema_error) = [:message] return unless klass = .class.name.split('::').last return unless @filter&.type.nil? || [@filter&.type].flatten.include?(klass) @distributor.log "#{identifier}: cancelled due to schema error in #{klass} #{.m_id_short}", level: :debug cancel error end |
#reject_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/rsmp/collect/collector.rb', line 130 def reject_not_ack() return unless @m_id return unless .is_a?(MessageNotAck) return unless .attribute('oMId') == @m_id m_id_short = RSMP::Message.shorten_m_id @m_id, 8 cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{.attribute('rea')}'") @distributor.log "#{identifier}: cancelled due to a NotAck", level: :debug true end |
#reset ⇒ Object
52 53 54 55 56 |
# File 'lib/rsmp/collect/collector.rb', line 52 def reset @messages = [] @error = nil @status = :ready end |
#start(&block) ⇒ Object
Start collection and return immediately You can later use wait() to wait for completion
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rsmp/collect/collector.rb', line 117 def start(&block) raise "Can't start collectimng unless ready (currently #{@status})" unless ready? @block = block raise ArgumentError, 'Num, timeout or block must be provided' unless @num || @timeout || @block reset @status = :collecting log_start @distributor&.add_receiver self end |
#use_task(task) ⇒ Object
48 49 50 |
# File 'lib/rsmp/collect/collector.rb', line 48 def use_task(task) @task = task end |
#wait ⇒ Object
If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.
91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rsmp/collect/collector.rb', line 91 def wait if collecting? if @timeout @task.with_timeout(@timeout) { @condition.wait } else @condition.wait end end @status rescue Async::TimeoutError @error = RSMP::TimeoutError.new describe_progress @status = :timeout end |
#wait! ⇒ Object
If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.
108 109 110 111 112 113 |
# File 'lib/rsmp/collect/collector.rb', line 108 def wait! wait raise @error if timeout? @messages end |