Module: RSMP::Proxy::Modules::Receive
- Included in:
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy/modules/receive.rb
Overview
Message processing functionality Handles receiving and processing incoming messages
Instance Method Summary collapse
- #expect_version_message(message) ⇒ Object
- #handle_fatal_error(message, error) ⇒ Object
- #handle_invalid_message(message, error) ⇒ Object
- #handle_invalid_packet(json, error) ⇒ Object
- #handle_malformed_message(attributes, error) ⇒ Object
- #handle_schema_error(message, error) ⇒ Object
- #process_deferred ⇒ Object
- #process_message(message) ⇒ Object
- #process_packet(json) ⇒ Object
- #should_validate_ingoing_message?(message) ⇒ Boolean
- #verify_sequence(message) ⇒ Object
- #will_not_handle(message) ⇒ Object
Instance Method Details
#expect_version_message(message) ⇒ Object
114 115 116 117 118 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 114 def () return if .is_a?(Version) || .is_a?(MessageAck) || .is_a?(MessageNotAck) raise HandshakeError, 'Version must be received first' end |
#handle_fatal_error(message, error) ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 59 def handle_fatal_error(, error) reason = error. str = "Rejected #{.type}," distribute_error error.exception(str), message: dont_acknowledge , str, reason close end |
#handle_invalid_message(message, error) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 51 def (, error) reason = error..to_s str = "Received invalid #{.type}," distribute_error error.exception("#{str} #{.json}"), message: dont_acknowledge , str, reason end |
#handle_invalid_packet(json, error) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 27 def handle_invalid_packet(json, error) str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{error.}" distribute_error error.exception(str) log str, level: :warning nil end |
#handle_malformed_message(attributes, error) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 34 def (attributes, error) str = "Received malformed message, #{error.}" distribute_error error.exception(str) log str, message: Malformed.new(attributes), level: :warning nil end |
#handle_schema_error(message, error) ⇒ Object
41 42 43 44 45 46 47 48 49 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 41 def handle_schema_error(, error) failed_schemas = error.respond_to?(:schemas) && error.schemas ? error.schemas : schemas schemas_string = failed_schemas.map { |schema| "#{schema.first}: #{schema.last}" }.join(', ') reason = "schema errors (#{schemas_string}): #{error.}" str = "Received invalid #{.type}" distribute_error error.exception(str), message: dont_acknowledge , str, reason end |
#process_deferred ⇒ Object
19 20 21 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 19 def process_deferred @node.process_deferred end |
#process_message(message) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 93 def () case when MessageAck process_ack when MessageNotAck process_not_ack when Version process_version when RSMP::Watchdog process_watchdog else dont_acknowledge , 'Received', "unknown message (#{.type})" end end |
#process_packet(json) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 68 def process_packet(json) attributes = Message.parse_attributes json = Message.build attributes, json .validate(schemas) if () verify_sequence with_deferred_distribution do distribute end process_deferred rescue InvalidPacket => e handle_invalid_packet(json, e) rescue MalformedMessage => e (attributes, e) rescue SchemaError, RSMP::Schema::Error => e handle_schema_error(, e) rescue InvalidMessage, MessageRejected => e (, e) rescue FatalError => e handle_fatal_error(, e) ensure @node.clear_deferred end |
#should_validate_ingoing_message?(message) ⇒ Boolean
7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 7 def () return false if .is_a?(Version) && !@version_determined return true unless @site_settings skip = @site_settings['skip_validation'] return true unless skip klass = .class.name.split('::').last !skip.include?(klass) end |
#verify_sequence(message) ⇒ Object
23 24 25 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 23 def verify_sequence() () unless @version_determined end |
#will_not_handle(message) ⇒ Object
108 109 110 111 112 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 108 def will_not_handle() reason ||= "since we're a #{self.class.name.downcase}" log "Ignoring #{.type}, #{reason}", message: , level: :warning dont_acknowledge , nil, reason end |