Module: RSMP::SupervisorProxy::Modules::MessageBuffer
- Included in:
- RSMP::SupervisorProxy
- Defined in:
- lib/rsmp/proxy/supervisor/modules/message_buffer.rb
Overview
In-memory outgoing communication buffer for site-originated messages.
Instance Method Summary collapse
- #buffer_message(message, error = nil) ⇒ Object
- #clone_message(message, attributes = message.attributes) ⇒ Object
- #enqueue_buffered_message(message) ⇒ Object
- #flush_buffered_message(message, queued, index) ⇒ Object
- #flush_message_buffer ⇒ Object
- #message_buffer_max_messages ⇒ Object
- #message_buffer_settings ⇒ Object
- #normalize_aggregated_status_buffer(states, core_version) ⇒ Object
- #prepare_aggregated_status_for_buffer(message, core_version:, for_send:) ⇒ Object
- #prepare_message_for_buffer(message, core_version: @core_version, for_send: false) ⇒ Object
- #prepare_status_update_for_buffer(message, core_version:, for_send:) ⇒ Object
- #site_originated_buffer_candidate?(message) ⇒ Boolean
- #status_buffer_selector?(component_id, status) ⇒ Boolean
- #status_buffer_selector_matches?(selector, component_id, status) ⇒ Boolean
- #status_buffer_selectors ⇒ Object
Instance Method Details
#buffer_message(message, error = nil) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 91 def (, error = nil) prepared = , core_version: @core_version if prepared prepared elsif site_originated_buffer_candidate? log "Discarded #{.type}; it is not configured for buffering", message: , level: :warning else super end rescue NotReady, IOError raise error if error end |
#clone_message(message, attributes = message.attributes) ⇒ Object
36 37 38 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 36 def (, attributes = .attributes) .class.new(JSON.parse(JSON.generate(attributes))) end |
#enqueue_buffered_message(message) ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 104 def () while @message_buffer.size >= dropped = @message_buffer.shift log "Dropped buffered #{dropped.type}; message buffer is full", message: dropped, level: :warning end @message_buffer << log "Buffered #{.type}; #{.size} message(s) queued", message: , level: :warning end |
#flush_buffered_message(message, queued, index) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 125 def (, queued, index) prepared = , core_version: @core_version, for_send: true return true unless prepared prepared, 'from buffer', buffer: false true rescue NotReady, IOError @message_buffer = queued[index..] + @message_buffer log "Stopped sending buffered messages; #{.size} message(s) remain queued", level: :warning false end |
#flush_message_buffer ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 114 def return if @message_buffer.empty? queued = @message_buffer @message_buffer = [] log "Sending #{queued.size} buffered message(s)", level: :info queued.each_with_index do |, index| break unless (, queued, index) end end |
#message_buffer_max_messages ⇒ Object
10 11 12 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 10 def ['max_messages'] || 10_000 end |
#message_buffer_settings ⇒ Object
6 7 8 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 6 def @site_settings['message_buffer'] || {} end |
#normalize_aggregated_status_buffer(states, core_version) ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 61 def normalize_aggregated_status_buffer(states, core_version) if version_meets_requirement?(core_version, '<=3.1.2') states.map { |item| item == true || item.to_s == 'true' ? 'true' : 'false' } else states.map { |item| item == true || item.to_s == 'true' } end end |
#prepare_aggregated_status_for_buffer(message, core_version:, for_send:) ⇒ Object
69 70 71 72 73 74 75 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 69 def prepare_aggregated_status_for_buffer(, core_version:, for_send:) attributes = JSON.parse(JSON.generate(.attributes)) if for_send && core_version && attributes['se'] attributes['se'] = normalize_aggregated_status_buffer(attributes['se'], core_version) end , attributes end |
#prepare_message_for_buffer(message, core_version: @core_version, for_send: false) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 77 def (, core_version: @core_version, for_send: false) return unless site_originated_buffer_candidate? return false if .is_a?(RSMP::StatusUpdate) && status_buffer_selectors == false return false if .is_a?(RSMP::Alarm) && !receive_alarms? if .is_a? RSMP::AggregatedStatus prepare_aggregated_status_for_buffer , core_version: core_version, for_send: for_send elsif .is_a? RSMP::StatusUpdate prepare_status_update_for_buffer , core_version: core_version, for_send: for_send else end end |
#prepare_status_update_for_buffer(message, core_version:, for_send:) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 49 def prepare_status_update_for_buffer(, core_version:, for_send:) attributes = JSON.parse(JSON.generate(.attributes)) component_id = attributes['cId'] attributes['sS'] = attributes['sS'].select { |status| status_buffer_selector?(component_id, status) } return if attributes['sS'].empty? if for_send && core_version && version_meets_requirement?(core_version, '>=3.2.0') attributes['sS'].each { |status| status['q'] = 'old' } end , attributes end |
#site_originated_buffer_candidate?(message) ⇒ Boolean
40 41 42 43 44 45 46 47 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 40 def site_originated_buffer_candidate?() .is_a?(RSMP::AggregatedStatus) || .is_a?(RSMP::AlarmIssue) || .is_a?(RSMP::AlarmSuspended) || .is_a?(RSMP::AlarmResumed) || .is_a?(RSMP::AlarmAcknowledged) || .is_a?(RSMP::StatusUpdate) end |
#status_buffer_selector?(component_id, status) ⇒ Boolean
20 21 22 23 24 25 26 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 20 def status_buffer_selector?(component_id, status) selectors = status_buffer_selectors return true if selectors == true return false unless selectors.is_a?(Array) selectors.any? { |selector| status_buffer_selector_matches?(selector, component_id, status) } end |
#status_buffer_selector_matches?(selector, component_id, status) ⇒ Boolean
28 29 30 31 32 33 34 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 28 def status_buffer_selector_matches?(selector, component_id, status) selector = selector.transform_keys(&:to_s) component_matches = !selector['cId'] || selector['cId'] == component_id code_matches = !selector['sCI'] || selector['sCI'] == status['sCI'] name_matches = !selector['n'] || selector['n'] == status['n'] component_matches && code_matches && name_matches end |
#status_buffer_selectors ⇒ Object
14 15 16 17 18 |
# File 'lib/rsmp/proxy/supervisor/modules/message_buffer.rb', line 14 def status_buffer_selectors return ['statuses'] if .key? 'statuses' [] end |