Class: RSMP::Supervisor
- Includes:
- Modules::Configuration, Modules::Connection, Modules::Sites
- Defined in:
- lib/rsmp/node/supervisor/supervisor.rb,
lib/rsmp/options/supervisor_options.rb,
lib/rsmp/node/supervisor/modules/sites.rb,
lib/rsmp/node/supervisor/modules/connection.rb,
lib/rsmp/node/supervisor/modules/configuration.rb
Overview
RSMP supervisor (server) that accepts site connections.
Defined Under Namespace
Modules: Modules Classes: Options
Instance Attribute Summary collapse
-
#core_version ⇒ Object
readonly
Returns the value of attribute core_version.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#proxies ⇒ Object
readonly
Returns the value of attribute proxies.
-
#ready_condition ⇒ Object
readonly
Returns the value of attribute ready_condition.
-
#site_id_condition ⇒ Object
Returns the value of attribute site_id_condition.
-
#supervisor_settings ⇒ Object
readonly
Returns the value of attribute supervisor_settings.
Attributes inherited from Node
#archive, #clock, #collector, #deferred, #error_queue, #task
Attributes included from Task
Attributes included from Logging
Class Method Summary collapse
Instance Method Summary collapse
- #build_outbound_proxies ⇒ Object
- #build_proxy(settings) ⇒ Object
- #client_role? ⇒ Boolean
- #connect_to_sites ⇒ Object
-
#initialize(options = {}) ⇒ Supervisor
constructor
A new instance of Supervisor.
-
#run ⇒ Object
listen for connections.
- #server_role? ⇒ Boolean
- #site_id ⇒ Object
-
#stop ⇒ Object
stop.
Methods included from Modules::Sites
#aggregated_status_changed, #check_site_already_connected, #check_site_id, #find_site, #find_site_from_ip_port, #site_connected?, #site_ids_changed, #wait_for_site, #wait_for_site_disconnect
Methods included from Modules::Connection
#accept?, #accept_connection, #authorize_ip, #build_proxy_settings, #check_max_sites, #close, #format_ip_and_port, #handle_connection, #peek_version_message, #reject_connection, #retrieve_site_id, #setup_proxy, #validate_and_start_proxy
Methods included from Modules::Configuration
#check_site_sxls, #denormalize_site_sxls, #denormalize_supervisor_sxls, #handle_supervisor_settings, #ip_to_site_settings, #site_id_to_site_setting
Methods inherited from Node
#author, #check_required_settings, #clear_deferred, #defer, #distribute_error, #do_deferred, #ignore_errors, #inspect, #now, #process_deferred, #stop_subtasks
Methods included from Task
#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options = {}) ⇒ Supervisor
Returns a new instance of Supervisor.
12 13 14 15 16 17 18 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 12 def initialize( = {}) handle_supervisor_settings([:supervisor_settings] || {}) super @proxies = [] @ready_condition = Async::Notification.new @site_id_condition = Async::Notification.new end |
Instance Attribute Details
#core_version ⇒ Object (readonly)
Returns the value of attribute core_version.
8 9 10 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8 def core_version @core_version end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8 def logger @logger end |
#proxies ⇒ Object (readonly)
Returns the value of attribute proxies.
8 9 10 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8 def proxies @proxies end |
#ready_condition ⇒ Object (readonly)
Returns the value of attribute ready_condition.
8 9 10 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8 def ready_condition @ready_condition end |
#site_id_condition ⇒ Object
Returns the value of attribute site_id_condition.
10 11 12 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 10 def site_id_condition @site_id_condition end |
#supervisor_settings ⇒ Object (readonly)
Returns the value of attribute supervisor_settings.
8 9 10 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8 def supervisor_settings @supervisor_settings end |
Class Method Details
.build_id_from_ip_port(ip, port) ⇒ Object
107 108 109 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 107 def self.build_id_from_ip_port(ip, port) Digest::MD5.hexdigest("#{ip}:#{port}")[0..8] end |
Instance Method Details
#build_outbound_proxies ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 70 def build_outbound_proxies site_entries = (@supervisor_settings['sites'] || {}).except('default') site_entries.each_pair do |site_id, site_settings| endpoints = site_settings['supervisors'] || [] merged_settings = site_id_to_site_setting site_id endpoints.each do |endpoint| @proxies << SiteProxy.new({ supervisor: self, task: @task, settings: @supervisor_settings, site_id: site_id, site_settings: merged_settings, ip: endpoint['ip'], port: endpoint['port'], logger: @logger, archive: @archive, collect: @collect }) end end end |
#build_proxy(settings) ⇒ Object
103 104 105 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 103 def build_proxy(settings) SiteProxy.new settings end |
#client_role? ⇒ Boolean
24 25 26 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 24 def client_role? @supervisor_settings['connection_role'] == 'client' end |
#connect_to_sites ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 60 def connect_to_sites log 'Starting supervisor in client connection role', level: :info, timestamp: @clock.now build_outbound_proxies @ready_condition.signal @proxies.each(&:start) @proxies.each(&:wait) end |
#run ⇒ Object
listen for connections
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 33 def run return connect_to_sites if client_role? log "Starting supervisor on port #{@supervisor_settings['port']}", level: :info, timestamp: @clock.now @endpoint = IO::Endpoint.tcp('0.0.0.0', @supervisor_settings['port']) @accept_task = Async::Task.current.async do |task| task.annotate 'supervisor accept loop' @endpoint.accept do |socket| # creates fibers handle_connection(socket) rescue StandardError => e distribute_error e, level: :internal end rescue Async::Stop # Expected during shutdown - no action needed rescue StandardError => e distribute_error e, level: :internal end @ready_condition.signal @accept_task.wait rescue StandardError => e distribute_error e, level: :internal end |
#server_role? ⇒ Boolean
28 29 30 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 28 def server_role? !client_role? end |
#site_id ⇒ Object
20 21 22 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 20 def site_id @supervisor_settings['site_id'] end |
#stop ⇒ Object
stop
93 94 95 96 97 98 99 100 101 |
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 93 def stop log "Stopping supervisor #{@supervisor_settings['site_id']}", level: :info @accept_task&.stop @accept_task = nil @endpoint = nil super end |