Module: RSMP::SupervisorProxy::Modules::MessageBuffer

Included in:
RSMP::SupervisorProxy
Defined in:
lib/rsmp/proxy/supervisor/modules/message_buffer.rb

Overview

In-memory outgoing communication buffer for site-originated messages.

Instance Method Summary collapse

Instance Method Details

#buffer_message(message, error = nil) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 96

def buffer_message(message, error = nil)
  prepared = prepare_message_for_buffer message, core_version: @core_version
  if prepared
    enqueue_buffered_message prepared
  elsif site_originated_buffer_candidate? message
    log "Discarded #{message.type}; it is not configured for buffering", message: message, level: :warning
  else
    super
  end
rescue NotReady, IOError
  raise error if error
end

#clone_message(message, attributes = message.attributes) ⇒ Object



40
41
42
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 40

def clone_message(message, attributes = message.attributes)
  message.class.new(JSON.parse(JSON.generate(attributes)))
end

#enqueue_buffered_message(message) ⇒ Object



109
110
111
112
113
114
115
116
117
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 109

def enqueue_buffered_message(message)
  while @message_buffer.size >= message_buffer_max_messages
    dropped = @message_buffer.shift
    log "Dropped buffered #{dropped.type}; message buffer is full", message: dropped, level: :warning
  end
  @message_buffer << message
  log "Buffered #{message.type}; #{message_buffer.size} message(s) queued", message: message, level: :warning
  message
end

#flush_buffered_message(message, queued, index) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 130

def flush_buffered_message(message, queued, index)
  prepared = prepare_message_for_buffer message, core_version: @core_version, for_send: true
  return true unless prepared

  send_message prepared, 'from buffer', buffer: false
  true
rescue NotReady, IOError
  @message_buffer = queued[index..] + @message_buffer
  log "Stopped sending buffered messages; #{message_buffer.size} message(s) remain queued", level: :warning
  false
end

#flush_message_bufferObject



119
120
121
122
123
124
125
126
127
128
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 119

def flush_message_buffer
  return if @message_buffer.empty?

  queued = @message_buffer
  @message_buffer = []
  log "Sending #{queued.size} buffered message(s)", level: :info
  queued.each_with_index do |message, index|
    break unless flush_buffered_message(message, queued, index)
  end
end

#message_buffer_enabled?Boolean

Returns:

  • (Boolean)


10
11
12
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 10

def message_buffer_enabled?
  message_buffer_settings['enabled'] != false
end

#message_buffer_max_messagesObject



14
15
16
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 14

def message_buffer_max_messages
  message_buffer_settings['max_messages'] || 10_000
end

#message_buffer_settingsObject



6
7
8
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 6

def message_buffer_settings
  @site_settings['message_buffer'] || {}
end

#normalize_aggregated_status_buffer(states, core_version) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 65

def normalize_aggregated_status_buffer(states, core_version)
  if version_meets_requirement?(core_version, '<=3.1.2')
    states.map { |item| item == true || item.to_s == 'true' ? 'true' : 'false' }
  else
    states.map { |item| item == true || item.to_s == 'true' }
  end
end

#prepare_aggregated_status_for_buffer(message, core_version:, for_send:) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 73

def prepare_aggregated_status_for_buffer(message, core_version:, for_send:)
  attributes = JSON.parse(JSON.generate(message.attributes))
  if for_send && core_version && attributes['se']
    attributes['se'] = normalize_aggregated_status_buffer(attributes['se'], core_version)
  end
  clone_message message, attributes
end

#prepare_message_for_buffer(message, core_version: @core_version, for_send: false) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 81

def prepare_message_for_buffer(message, core_version: @core_version, for_send: false)
  return unless message_buffer_enabled?
  return unless site_originated_buffer_candidate? message
  return false if message.is_a?(RSMP::StatusUpdate) && status_buffer_selectors == false
  return false if message.is_a?(RSMP::Alarm) && !receive_alarms?

  if message.is_a? RSMP::AggregatedStatus
    prepare_aggregated_status_for_buffer message, core_version: core_version, for_send: for_send
  elsif message.is_a? RSMP::StatusUpdate
    prepare_status_update_for_buffer message, core_version: core_version, for_send: for_send
  else
    clone_message message
  end
end

#prepare_status_update_for_buffer(message, core_version:, for_send:) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 53

def prepare_status_update_for_buffer(message, core_version:, for_send:)
  attributes = JSON.parse(JSON.generate(message.attributes))
  component_id = attributes['cId']
  attributes['sS'] = attributes['sS'].select { |status| status_buffer_selector?(component_id, status) }
  return if attributes['sS'].empty?

  if for_send && core_version && version_meets_requirement?(core_version, '>=3.2.0')
    attributes['sS'].each { |status| status['q'] = 'old' }
  end
  clone_message message, attributes
end

#site_originated_buffer_candidate?(message) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
47
48
49
50
51
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 44

def site_originated_buffer_candidate?(message)
  message.is_a?(RSMP::AggregatedStatus) ||
    message.is_a?(RSMP::AlarmIssue) ||
    message.is_a?(RSMP::AlarmSuspended) ||
    message.is_a?(RSMP::AlarmResumed) ||
    message.is_a?(RSMP::AlarmAcknowledged) ||
    message.is_a?(RSMP::StatusUpdate)
end

#status_buffer_selector?(component_id, status) ⇒ Boolean

Returns:

  • (Boolean)


24
25
26
27
28
29
30
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 24

def status_buffer_selector?(component_id, status)
  selectors = status_buffer_selectors
  return true if selectors == true
  return false unless selectors.is_a?(Array)

  selectors.any? { |selector| status_buffer_selector_matches?(selector, component_id, status) }
end

#status_buffer_selector_matches?(selector, component_id, status) ⇒ Boolean

Returns:

  • (Boolean)


32
33
34
35
36
37
38
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 32

def status_buffer_selector_matches?(selector, component_id, status)
  selector = selector.transform_keys(&:to_s)
  component_matches = !selector['cId'] || selector['cId'] == component_id
  code_matches = !selector['sCI'] || selector['sCI'] == status['sCI']
  name_matches = !selector['n'] || selector['n'] == status['n']
  component_matches && code_matches && name_matches
end

#status_buffer_selectorsObject



18
19
20
21
22
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 18

def status_buffer_selectors
  return message_buffer_settings['statuses'] if message_buffer_settings.key? 'statuses'

  []
end