Class: RSMP::Supervisor

Inherits:
Node
  • Object
show all
Includes:
Modules::Configuration, Modules::Connection, Modules::Sites
Defined in:
lib/rsmp/node/supervisor/supervisor.rb,
lib/rsmp/options/supervisor_options.rb,
lib/rsmp/node/supervisor/modules/sites.rb,
lib/rsmp/node/supervisor/modules/connection.rb,
lib/rsmp/node/supervisor/modules/configuration.rb

Overview

RSMP supervisor (server) that accepts site connections.

Defined Under Namespace

Modules: Modules Classes: Options

Instance Attribute Summary collapse

Attributes inherited from Node

#archive, #clock, #collector, #deferred, #error_queue, #task

Attributes included from Task

#task

Attributes included from Logging

#archive

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Modules::Sites

#aggregated_status_changed, #check_site_already_connected, #check_site_id, #find_site, #find_site_from_ip_port, #site_connected?, #site_ids_changed, #wait_for_site, #wait_for_site_disconnect

Methods included from Modules::Connection

#accept?, #accept_connection, #authorize_ip, #build_proxy_settings, #check_max_sites, #close, #format_ip_and_port, #handle_connection, #peek_version_message, #reject_connection, #retrieve_site_id, #setup_proxy, #validate_and_start_proxy

Methods included from Modules::Configuration

#check_site_sxl_types, #handle_supervisor_settings, #ip_to_site_settings, #site_id_to_site_setting

Methods inherited from Node

#author, #check_required_settings, #clear_deferred, #defer, #distribute_error, #do_deferred, #ignore_errors, #now, #process_deferred, #stop_subtasks

Methods included from Task

#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition

Methods included from Inspect

#inspect, #inspector

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options = {}) ⇒ Supervisor

Returns a new instance of Supervisor.



12
13
14
15
16
17
18
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 12

def initialize(options = {})
  handle_supervisor_settings(options[:supervisor_settings] || {})
  super
  @proxies = []
  @ready_condition = Async::Notification.new
  @site_id_condition = Async::Notification.new
end

Instance Attribute Details

#core_versionObject (readonly)

Returns the value of attribute core_version.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def core_version
  @core_version
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def logger
  @logger
end

#proxiesObject (readonly)

Returns the value of attribute proxies.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def proxies
  @proxies
end

#ready_conditionObject (readonly)

Returns the value of attribute ready_condition.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def ready_condition
  @ready_condition
end

#site_id_conditionObject

Returns the value of attribute site_id_condition.



10
11
12
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 10

def site_id_condition
  @site_id_condition
end

#supervisor_settingsObject (readonly)

Returns the value of attribute supervisor_settings.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def supervisor_settings
  @supervisor_settings
end

Class Method Details

.build_id_from_ip_port(ip, port) ⇒ Object



72
73
74
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 72

def self.build_id_from_ip_port(ip, port)
  Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end

Instance Method Details

#build_proxy(settings) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 61

def build_proxy(settings)
  # Determine proxy type from site settings (SXL). Fall back to supervisor
  # default settings when site-specific settings are not present.
  site_settings = settings[:site_settings] || @supervisor_settings['default']
  sxl_type = site_settings && site_settings['sxl']

  return RSMP::TLC::TrafficControllerProxy.new(settings) if sxl_type == 'tlc'

  SiteProxy.new settings
end

#runObject

listen for connections



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 25

def run
  log "Starting supervisor on port #{@supervisor_settings['port']}",
      level: :info,
      timestamp: @clock.now

  @endpoint = IO::Endpoint.tcp('0.0.0.0', @supervisor_settings['port'])
  @accept_task = Async::Task.current.async do |task|
    task.annotate 'supervisor accept loop'
    @endpoint.accept do |socket| # creates fibers
      handle_connection(socket)
    rescue StandardError => e
      distribute_error e, level: :internal
    end
  rescue Async::Stop
    # Expected during shutdown - no action needed
  rescue StandardError => e
    distribute_error e, level: :internal
  end

  @ready_condition.signal
  @accept_task.wait
rescue StandardError => e
  distribute_error e, level: :internal
end

#site_idObject



20
21
22
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 20

def site_id
  @supervisor_settings['site_id']
end

#stopObject

stop



51
52
53
54
55
56
57
58
59
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 51

def stop
  log "Stopping supervisor #{@supervisor_settings['site_id']}", level: :info

  @accept_task&.stop
  @accept_task = nil

  @endpoint = nil
  super
end