Class: RSMP::Collector

Inherits:
Object
  • Object
show all
Includes:
Reporting, Status, Logging, Receiver
Defined in:
lib/rsmp/collect/collector.rb,
lib/rsmp/collect/collector/status.rb,
lib/rsmp/collect/collector/logging.rb,
lib/rsmp/collect/collector/reporting.rb

Overview

Collects messages from a distributor. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.

Defined Under Namespace

Modules: Logging, Reporting, Status

Instance Attribute Summary collapse

Attributes included from Logging

#archive, #logger

Instance Method Summary collapse

Methods included from Logging

#author, #initialize_logging, #log

Methods included from Reporting

#describe_matcher, #describe_num_and_type, #describe_progress, #describe_types, #identifier

Methods included from Status

#cancelled?, #collecting?, #ingoing?, #ok?, #outgoing?, #ready?, #timeout?

Methods included from Receiver

#accept_message?, #handle_message, #initialize_receiver, #reject_message?, #start_receiving, #stop_receiving

Methods included from Inspect

#inspector

Constructor Details

#initialize(distributor, options = {}) ⇒ Collector

Returns a new instance of Collector.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rsmp/collect/collector.rb', line 13

def initialize(distributor, options = {})
  initialize_receiver distributor, filter: options[:filter]
  @options = {
    cancel: {
      schema_error: true,
      disconnect: false
    }
  }.deep_merge options
  @timeout = options[:timeout]
  @num = options[:num]
  @initiator = options[:initiator]
  @m_id = options[:m_id] || @initiator&.attributes&.dig('mId')
  @condition = Async::Notification.new
  make_title options[:title]

  if task
    @task = task
  elsif distributor.respond_to? 'task'
    # if distributor is a Proxy, or some other object that implements task(),
    # then try to get the task that way
    @task = distributor.task
  end
  reset
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def condition
  @condition
end

#errorObject (readonly)

Returns the value of attribute error.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def error
  @error
end

#initiatorObject (readonly)

Returns the value of attribute initiator.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def initiator
  @initiator
end

#m_idObject (readonly)

Returns the value of attribute m_id.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def m_id
  @m_id
end

#messagesObject (readonly)

Returns the value of attribute messages.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def status
  @status
end

#taskObject (readonly)

Returns the value of attribute task.



11
12
13
# File 'lib/rsmp/collect/collector.rb', line 11

def task
  @task
end

Instance Method Details

#acceptable?(message) ⇒ Boolean

Check a message against our match criteria Return true if there’s a match, false if not

Returns:

  • (Boolean)


247
248
249
# File 'lib/rsmp/collect/collector.rb', line 247

def acceptable?(message)
  @filter.nil? || @filter.accept?(message)
end

#cancel(error = nil) ⇒ Object

Abort collection



234
235
236
237
238
# File 'lib/rsmp/collect/collector.rb', line 234

def cancel(error = nil)
  @error = error
  @status = :cancelled
  do_stop
end

#collectObject

Collect message Will return once all messages have been collected, or timeout is reached



73
74
75
76
77
78
79
# File 'lib/rsmp/collect/collector.rb', line 73

def collect(&)
  start(&)
  wait
  @status
ensure
  @distributor&.remove_receiver self
end

#collect!Object

Collect message Returns the collected messages, or raise an exception in case of a time out.



83
84
85
86
87
# File 'lib/rsmp/collect/collector.rb', line 83

def collect!(&)
  collect(&)
  ok!
  @messages
end

#completeObject

Called when we’re done collecting. Remove ourself as a receiver, se we don’t receive message notifications anymore



183
184
185
186
187
# File 'lib/rsmp/collect/collector.rb', line 183

def complete
  @status = :ok
  do_stop
  log_complete
end

#describeObject



159
# File 'lib/rsmp/collect/collector.rb', line 159

def describe; end

#do_stopObject

Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition



196
197
198
199
# File 'lib/rsmp/collect/collector.rb', line 196

def do_stop
  @distributor.remove_receiver self
  @condition.signal
end

#done?Boolean

Have we collected the required number of messages?

Returns:

  • (Boolean)


177
178
179
# File 'lib/rsmp/collect/collector.rb', line 177

def done?
  @num && @messages.size >= @num
end

#incompleteObject

called when we received a message, but are not done yet



190
191
192
# File 'lib/rsmp/collect/collector.rb', line 190

def incomplete
  log_incomplete
end

#inspectObject

Inspect formatter that shows the message we have collected



59
60
61
# File 'lib/rsmp/collect/collector.rb', line 59

def inspect
  "#<#{self.class.name}:#{object_id}, #{inspector(:@messages)}>"
end

#keep(message) ⇒ Object

Store a message in the result array



241
242
243
# File 'lib/rsmp/collect/collector.rb', line 241

def keep(message)
  @messages << message
end

#make_title(title) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/rsmp/collect/collector.rb', line 38

def make_title(title)
  @title = if title
             title
           elsif @filter
             [@filter.type].flatten.join('/')
           else
             ''
           end
end

#ok!Object

if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector

Raises:



65
66
67
68
69
# File 'lib/rsmp/collect/collector.rb', line 65

def ok!
  raise @error if @error

  self
end

#perform_match(message) ⇒ Object

Match message against our collection criteria



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/rsmp/collect/collector.rb', line 162

def perform_match(message)
  return false if reject_not_ack(message)
  return false unless acceptable?(message)

  if @block
    status = [@block.call(message)].flatten
    return unless collecting?

    keep message if status.include?(:keep)
  else
    keep message
  end
end

#receive(message) ⇒ Object

Handle message and return true if we’re done collecting

Raises:

  • (ArgumentError)


143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/rsmp/collect/collector.rb', line 143

def receive(message)
  raise ArgumentError unless message
  unless ready? || collecting?
    raise "can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}"
  end

  if perform_match message
    if done?
      complete
    else
      incomplete
    end
  end
  @status
end

#receive_disconnect(error, _options) ⇒ Object

Cancel if we received e notifiction about a disconnect



226
227
228
229
230
231
# File 'lib/rsmp/collect/collector.rb', line 226

def receive_disconnect(error, _options)
  return unless @options.dig(:cancel, :disconnect)

  @distributor.log "#{identifier}: cancelled due to a connection error: #{error}", level: :debug
  cancel error
end

#receive_error(error, options = {}) ⇒ Object

Handle upstream error



202
203
204
205
206
207
208
209
# File 'lib/rsmp/collect/collector.rb', line 202

def receive_error(error, options = {})
  case error
  when RSMP::SchemaError
    receive_schema_error error, options
  when RSMP::DisconnectError
    receive_disconnect error, options
  end
end

#receive_schema_error(error, options) ⇒ Object

Cancel if we received e schema error for a message type we’re collecting



212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/rsmp/collect/collector.rb', line 212

def receive_schema_error(error, options)
  return unless @options.dig(:cancel, :schema_error)

  message = options[:message]
  return unless message

  klass = message.class.name.split('::').last
  return unless @filter&.type.nil? || [@filter&.type].flatten.include?(klass)

  @distributor.log "#{identifier}: cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug
  cancel error
end

#reject_not_ack(message) ⇒ Object

Check if we receive a NotAck related to initiating request, identified by @m_id.



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rsmp/collect/collector.rb', line 130

def reject_not_ack(message)
  return unless @m_id

  return unless message.is_a?(MessageNotAck)
  return unless message.attribute('oMId') == @m_id

  m_id_short = RSMP::Message.shorten_m_id @m_id, 8
  cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'")
  @distributor.log "#{identifier}: cancelled due to a NotAck", level: :debug
  true
end

#resetObject



52
53
54
55
56
# File 'lib/rsmp/collect/collector.rb', line 52

def reset
  @messages = []
  @error = nil
  @status = :ready
end

#start(&block) ⇒ Object

Start collection and return immediately You can later use wait() to wait for completion

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rsmp/collect/collector.rb', line 117

def start(&block)
  raise "Can't start collectimng unless ready (currently #{@status})" unless ready?

  @block = block
  raise ArgumentError, 'Num, timeout or block must be provided' unless @num || @timeout || @block

  reset
  @status = :collecting
  log_start
  @distributor&.add_receiver self
end

#use_task(task) ⇒ Object



48
49
50
# File 'lib/rsmp/collect/collector.rb', line 48

def use_task(task)
  @task = task
end

#waitObject

If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.



91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rsmp/collect/collector.rb', line 91

def wait
  if collecting?
    if @timeout
      @task.with_timeout(@timeout) { @condition.wait }
    else
      @condition.wait
    end
  end
  @status
rescue Async::TimeoutError
  @error = RSMP::TimeoutError.new describe_progress
  @status = :timeout
end

#wait!Object

If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.

Raises:



108
109
110
111
112
113
# File 'lib/rsmp/collect/collector.rb', line 108

def wait!
  wait
  raise @error if timeout?

  @messages
end