Module: RSMP::SupervisorProxy::Modules::MessageBuffer

Included in:
RSMP::SupervisorProxy
Defined in:
lib/rsmp/proxy/supervisor/modules/message_buffer.rb

Overview

In-memory outgoing communication buffer for site-originated messages.

Instance Method Summary collapse

Instance Method Details

#buffer_message(message, error = nil) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 91

def buffer_message(message, error = nil)
  prepared = prepare_message_for_buffer message, core_version: @core_version
  if prepared
    enqueue_buffered_message prepared
  elsif site_originated_buffer_candidate? message
    log "Discarded #{message.type}; it is not configured for buffering", message: message, level: :warning
  else
    super
  end
rescue NotReady, IOError
  raise error if error
end

#clone_message(message, attributes = message.attributes) ⇒ Object



36
37
38
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 36

def clone_message(message, attributes = message.attributes)
  message.class.new(JSON.parse(JSON.generate(attributes)))
end

#enqueue_buffered_message(message) ⇒ Object



104
105
106
107
108
109
110
111
112
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 104

def enqueue_buffered_message(message)
  while @message_buffer.size >= message_buffer_max_messages
    dropped = @message_buffer.shift
    log "Dropped buffered #{dropped.type}; message buffer is full", message: dropped, level: :warning
  end
  @message_buffer << message
  log "Buffered #{message.type}; #{message_buffer.size} message(s) queued", message: message, level: :warning
  message
end

#flush_buffered_message(message, queued, index) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 125

def flush_buffered_message(message, queued, index)
  prepared = prepare_message_for_buffer message, core_version: @core_version, for_send: true
  return true unless prepared

  send_message prepared, 'from buffer', buffer: false
  true
rescue NotReady, IOError
  @message_buffer = queued[index..] + @message_buffer
  log "Stopped sending buffered messages; #{message_buffer.size} message(s) remain queued", level: :warning
  false
end

#flush_message_bufferObject



114
115
116
117
118
119
120
121
122
123
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 114

def flush_message_buffer
  return if @message_buffer.empty?

  queued = @message_buffer
  @message_buffer = []
  log "Sending #{queued.size} buffered message(s)", level: :info
  queued.each_with_index do |message, index|
    break unless flush_buffered_message(message, queued, index)
  end
end

#message_buffer_max_messagesObject



10
11
12
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 10

def message_buffer_max_messages
  message_buffer_settings['max_messages'] || 10_000
end

#message_buffer_settingsObject



6
7
8
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 6

def message_buffer_settings
  @site_settings['message_buffer'] || {}
end

#normalize_aggregated_status_buffer(states, core_version) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 61

def normalize_aggregated_status_buffer(states, core_version)
  if version_meets_requirement?(core_version, '<=3.1.2')
    states.map { |item| item == true || item.to_s == 'true' ? 'true' : 'false' }
  else
    states.map { |item| item == true || item.to_s == 'true' }
  end
end

#prepare_aggregated_status_for_buffer(message, core_version:, for_send:) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 69

def prepare_aggregated_status_for_buffer(message, core_version:, for_send:)
  attributes = JSON.parse(JSON.generate(message.attributes))
  if for_send && core_version && attributes['se']
    attributes['se'] = normalize_aggregated_status_buffer(attributes['se'], core_version)
  end
  clone_message message, attributes
end

#prepare_message_for_buffer(message, core_version: @core_version, for_send: false) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 77

def prepare_message_for_buffer(message, core_version: @core_version, for_send: false)
  return unless site_originated_buffer_candidate? message
  return false if message.is_a?(RSMP::StatusUpdate) && status_buffer_selectors == false
  return false if message.is_a?(RSMP::Alarm) && !receive_alarms?

  if message.is_a? RSMP::AggregatedStatus
    prepare_aggregated_status_for_buffer message, core_version: core_version, for_send: for_send
  elsif message.is_a? RSMP::StatusUpdate
    prepare_status_update_for_buffer message, core_version: core_version, for_send: for_send
  else
    clone_message message
  end
end

#prepare_status_update_for_buffer(message, core_version:, for_send:) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 49

def prepare_status_update_for_buffer(message, core_version:, for_send:)
  attributes = JSON.parse(JSON.generate(message.attributes))
  component_id = attributes['cId']
  attributes['sS'] = attributes['sS'].select { |status| status_buffer_selector?(component_id, status) }
  return if attributes['sS'].empty?

  if for_send && core_version && version_meets_requirement?(core_version, '>=3.2.0')
    attributes['sS'].each { |status| status['q'] = 'old' }
  end
  clone_message message, attributes
end

#site_originated_buffer_candidate?(message) ⇒ Boolean

Returns:

  • (Boolean)


40
41
42
43
44
45
46
47
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 40

def site_originated_buffer_candidate?(message)
  message.is_a?(RSMP::AggregatedStatus) ||
    message.is_a?(RSMP::AlarmIssue) ||
    message.is_a?(RSMP::AlarmSuspended) ||
    message.is_a?(RSMP::AlarmResumed) ||
    message.is_a?(RSMP::AlarmAcknowledged) ||
    message.is_a?(RSMP::StatusUpdate)
end

#status_buffer_selector?(component_id, status) ⇒ Boolean

Returns:

  • (Boolean)


20
21
22
23
24
25
26
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 20

def status_buffer_selector?(component_id, status)
  selectors = status_buffer_selectors
  return true if selectors == true
  return false unless selectors.is_a?(Array)

  selectors.any? { |selector| status_buffer_selector_matches?(selector, component_id, status) }
end

#status_buffer_selector_matches?(selector, component_id, status) ⇒ Boolean

Returns:

  • (Boolean)


28
29
30
31
32
33
34
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 28

def status_buffer_selector_matches?(selector, component_id, status)
  selector = selector.transform_keys(&:to_s)
  component_matches = !selector['cId'] || selector['cId'] == component_id
  code_matches = !selector['sCI'] || selector['sCI'] == status['sCI']
  name_matches = !selector['n'] || selector['n'] == status['n']
  component_matches && code_matches && name_matches
end

#status_buffer_selectorsObject



14
15
16
17
18
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 14

def status_buffer_selectors
  return message_buffer_settings['statuses'] if message_buffer_settings.key? 'statuses'

  []
end