Class: RSMP::Proxy
- Inherits:
-
Object
- Object
- RSMP::Proxy
- Includes:
- Distributor, Inspect, Logging, Modules::Acknowledgements, Modules::Receive, Modules::Send, Modules::State, Modules::Tasks, Modules::Versions, Modules::Watchdogs, Task
- Defined in:
- lib/rsmp/proxy/proxy.rb,
lib/rsmp/proxy/modules/send.rb,
lib/rsmp/proxy/modules/state.rb,
lib/rsmp/proxy/modules/tasks.rb,
lib/rsmp/proxy/modules/receive.rb,
lib/rsmp/proxy/modules/versions.rb,
lib/rsmp/proxy/modules/watchdogs.rb,
lib/rsmp/proxy/modules/acknowledgements.rb
Overview
Represents a connection to a remote site or supervisor. Provides common connection lifecycle and message handling.
Direct Known Subclasses
Defined Under Namespace
Modules: Modules
Constant Summary collapse
- WRAPPING_DELIMITER =
"\f".freeze
Instance Attribute Summary collapse
-
#archive ⇒ Object
readonly
Returns the value of attribute archive.
-
#collector ⇒ Object
readonly
Returns the value of attribute collector.
-
#connection_info ⇒ Object
readonly
Returns the value of attribute connection_info.
-
#core_version ⇒ Object
readonly
Returns the value of attribute core_version.
-
#ip ⇒ Object
readonly
Returns the value of attribute ip.
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#state ⇒ Object
Returns the value of attribute state.
-
#sxl ⇒ Object
readonly
Returns the value of attribute sxl.
Attributes included from Task
Attributes included from Distributor
Attributes included from Logging
Class Method Summary collapse
-
.version_meets_requirement?(version, requirement) ⇒ Boolean
Use Gem class to check version requirement Requirement must be a string like ‘1.1’, ‘>=1.0.3’ or ‘<2.1.4’, or list of strings, like [‘<=1.4’,‘<1.5’].
Instance Method Summary collapse
- #author ⇒ Object
- #clear ⇒ Object
- #clock ⇒ Object
-
#close ⇒ Object
close connection, but keep our main task running so we can reconnect.
- #close_socket ⇒ Object
- #close_stream ⇒ Object
- #connected? ⇒ Boolean
-
#disconnect ⇒ Object
Connection lifecycle methods.
- #disconnected? ⇒ Boolean
-
#initialize(options) ⇒ Proxy
constructor
A new instance of Proxy.
- #inspect ⇒ Object
- #log(str, options = {}) ⇒ Object
- #now ⇒ Object
-
#ready? ⇒ Boolean
State management methods.
- #receive_error(error, options = {}) ⇒ Object
-
#revive(options) ⇒ Object
revive after a reconnect.
- #schemas ⇒ Object
- #setup(options) ⇒ Object
-
#state_changed ⇒ Object
the state changed override to to things like notifications.
- #stop_reader ⇒ Object
- #stop_subtasks ⇒ Object
- #stop_task ⇒ Object
- #stop_timer ⇒ Object
-
#wait_for_reader ⇒ Object
wait for the reader task to complete, which is not expected to happen before the connection is closed.
Methods included from Modules::Tasks
#read_line, #run_reader, #run_timer, #start_reader, #start_timer, #timer
Methods included from Modules::Versions
#check_core_version, #core_versions, #extraneous_version, #process_version, #send_version, #version_acknowledged
Methods included from Modules::Receive
#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_message, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle
Methods included from Modules::Send
#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_and_optionally_collect, #send_message
Methods included from Modules::Acknowledgements
#acknowledge, #acknowledged_first_ingoing, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged
Methods included from Modules::Watchdogs
#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled
Methods included from Modules::State
#handshake_complete, #wait_for_state
Methods included from Task
#initialize_task, #restart, #run, #start, #stop, #task_status, #wait, #wait_for_condition
Methods included from Inspect
Methods included from Distributor
#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #remove_receiver, #with_deferred_distribution
Methods included from Logging
Constructor Details
#initialize(options) ⇒ Proxy
Returns a new instance of Proxy.
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rsmp/proxy/proxy.rb', line 23 def initialize() @node = [:node] initialize_logging initialize_distributor initialize_task setup clear @state = :disconnected @state_condition = Async::Notification.new end |
Instance Attribute Details
#archive ⇒ Object (readonly)
Returns the value of attribute archive.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def archive @archive end |
#collector ⇒ Object (readonly)
Returns the value of attribute collector.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def collector @collector end |
#connection_info ⇒ Object (readonly)
Returns the value of attribute connection_info.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def connection_info @connection_info end |
#core_version ⇒ Object (readonly)
Returns the value of attribute core_version.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def core_version @core_version end |
#ip ⇒ Object (readonly)
Returns the value of attribute ip.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def ip @ip end |
#node ⇒ Object (readonly)
Returns the value of attribute node.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def node @node end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def port @port end |
#state ⇒ Object
Returns the value of attribute state.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def state @state end |
#sxl ⇒ Object (readonly)
Returns the value of attribute sxl.
21 22 23 |
# File 'lib/rsmp/proxy/proxy.rb', line 21 def sxl @sxl end |
Class Method Details
.version_meets_requirement?(version, requirement) ⇒ Boolean
Use Gem class to check version requirement Requirement must be a string like ‘1.1’, ‘>=1.0.3’ or ‘<2.1.4’, or list of strings, like [‘<=1.4’,‘<1.5’]
193 194 195 |
# File 'lib/rsmp/proxy/proxy.rb', line 193 def self.version_meets_requirement?(version, requirement) Modules::Versions.version_meets_requirement?(version, requirement) end |
Instance Method Details
#author ⇒ Object
186 187 188 |
# File 'lib/rsmp/proxy/proxy.rb', line 186 def @node.site_id end |
#clear ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/rsmp/proxy/proxy.rb', line 127 def clear @awaiting_acknowledgement = {} @latest_watchdog_received = nil @watchdog_started = false @version_determined = false @ingoing_acknowledged = {} @outgoing_acknowledged = {} @latest_watchdog_send_at = nil @acknowledgements = {} @acknowledgement_condition = Async::Notification.new end |
#clock ⇒ Object
167 168 169 |
# File 'lib/rsmp/proxy/proxy.rb', line 167 def clock @node.clock end |
#close ⇒ Object
close connection, but keep our main task running so we can reconnect
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rsmp/proxy/proxy.rb', line 49 def close log 'Closing connection', level: :warning close_stream close_socket stop_reader self.state = :disconnected distribute_error DisconnectError.new('Connection was closed') # stop timer # as we're running inside the timer, code after stop_timer() will not be called, # unless it's in the ensure block stop_timer end |
#close_socket ⇒ Object
88 89 90 91 92 |
# File 'lib/rsmp/proxy/proxy.rb', line 88 def close_socket @socket&.close ensure @socket = nil end |
#close_stream ⇒ Object
82 83 84 85 86 |
# File 'lib/rsmp/proxy/proxy.rb', line 82 def close_stream @stream&.close ensure @stream = nil end |
#connected? ⇒ Boolean
105 106 107 |
# File 'lib/rsmp/proxy/proxy.rb', line 105 def connected? @state == :connected || @state == :ready end |
#disconnect ⇒ Object
Connection lifecycle methods
40 |
# File 'lib/rsmp/proxy/proxy.rb', line 40 def disconnect; end |
#disconnected? ⇒ Boolean
109 110 111 |
# File 'lib/rsmp/proxy/proxy.rb', line 109 def disconnected? @state == :disconnected end |
#inspect ⇒ Object
161 162 163 164 165 |
# File 'lib/rsmp/proxy/proxy.rb', line 161 def inspect "#<#{self.class.name}:#{object_id}, #{inspector( :@acknowledgements, :@settings, :@site_settings )}>" end |
#log(str, options = {}) ⇒ Object
175 176 177 |
# File 'lib/rsmp/proxy/proxy.rb', line 175 def log(str, = {}) super(str, .merge(ip: @ip, port: @port, site_id: @site_id)) end |
#now ⇒ Object
34 35 36 |
# File 'lib/rsmp/proxy/proxy.rb', line 34 def now node.now end |
#ready? ⇒ Boolean
State management methods
101 102 103 |
# File 'lib/rsmp/proxy/proxy.rb', line 101 def ready? @state == :ready end |
#receive_error(error, options = {}) ⇒ Object
171 172 173 |
# File 'lib/rsmp/proxy/proxy.rb', line 171 def receive_error(error, = {}) @node.receive_error error, end |
#revive(options) ⇒ Object
revive after a reconnect
141 142 143 |
# File 'lib/rsmp/proxy/proxy.rb', line 141 def revive() setup end |
#schemas ⇒ Object
179 180 181 182 183 184 |
# File 'lib/rsmp/proxy/proxy.rb', line 179 def schemas schemas = { core: RSMP::Schema.latest_core_version } # use latest core schemas[:core] = core_version if core_version schemas[sxl] = RSMP::Schema.sanitize_version(sxl_version.to_s) if sxl && sxl_version schemas end |
#setup(options) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rsmp/proxy/proxy.rb', line 145 def setup() @settings = [:settings] @socket = [:socket] @stream = [:stream] @protocol = [:protocol] @ip = [:ip] @port = [:port] @connection_info = [:info] @sxl = nil @site_settings = nil # can't pick until we know the site id return unless [:collect] @collector = RSMP::Collector.new self, [:collect] @collector.start end |
#state_changed ⇒ Object
the state changed override to to things like notifications
123 124 125 |
# File 'lib/rsmp/proxy/proxy.rb', line 123 def state_changed @state_condition.signal @state end |
#stop_reader ⇒ Object
76 77 78 79 80 |
# File 'lib/rsmp/proxy/proxy.rb', line 76 def stop_reader @reader&.stop ensure @reader = nil end |
#stop_subtasks ⇒ Object
63 64 65 66 67 68 |
# File 'lib/rsmp/proxy/proxy.rb', line 63 def stop_subtasks stop_timer stop_reader clear super end |
#stop_task ⇒ Object
94 95 96 97 |
# File 'lib/rsmp/proxy/proxy.rb', line 94 def stop_task close super end |
#stop_timer ⇒ Object
70 71 72 73 74 |
# File 'lib/rsmp/proxy/proxy.rb', line 70 def stop_timer @timer&.stop ensure @timer = nil end |
#wait_for_reader ⇒ Object
wait for the reader task to complete, which is not expected to happen before the connection is closed
44 45 46 |
# File 'lib/rsmp/proxy/proxy.rb', line 44 def wait_for_reader @reader&.wait end |