Class: RSMP::Supervisor

Inherits:
Node
  • Object
show all
Includes:
Modules::Configuration, Modules::Connection, Modules::Sites
Defined in:
lib/rsmp/node/supervisor/supervisor.rb,
lib/rsmp/options/supervisor_options.rb,
lib/rsmp/node/supervisor/modules/sites.rb,
lib/rsmp/node/supervisor/modules/connection.rb,
lib/rsmp/node/supervisor/modules/configuration.rb

Overview

RSMP supervisor (server) that accepts site connections.

Defined Under Namespace

Modules: Modules Classes: Options

Instance Attribute Summary collapse

Attributes inherited from Node

#archive, #clock, #collector, #deferred, #error_queue, #task

Attributes included from Task

#task

Attributes included from Logging

#archive

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Modules::Sites

#aggregated_status_changed, #check_site_already_connected, #check_site_id, #find_site, #find_site_from_ip_port, #site_connected?, #site_ids_changed, #wait_for_site, #wait_for_site_disconnect

Methods included from Modules::Connection

#accept?, #accept_connection, #authorize_ip, #build_proxy_settings, #check_max_sites, #close, #format_ip_and_port, #handle_connection, #peek_version_message, #reject_connection, #retrieve_site_id, #setup_proxy, #validate_and_start_proxy

Methods included from Modules::Configuration

#check_site_sxls, #denormalize_site_sxls, #denormalize_supervisor_sxls, #handle_supervisor_settings, #ip_to_site_settings, #site_id_to_site_setting

Methods inherited from Node

#author, #check_required_settings, #clear_deferred, #defer, #distribute_error, #do_deferred, #ignore_errors, #inspect, #now, #process_deferred, #stop_subtasks

Methods included from Task

#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options = {}) ⇒ Supervisor

Returns a new instance of Supervisor.



12
13
14
15
16
17
18
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 12

def initialize(options = {})
  handle_supervisor_settings(options[:supervisor_settings] || {})
  super
  @proxies = []
  @ready_condition = Async::Notification.new
  @site_id_condition = Async::Notification.new
end

Instance Attribute Details

#core_versionObject (readonly)

Returns the value of attribute core_version.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def core_version
  @core_version
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def logger
  @logger
end

#proxiesObject (readonly)

Returns the value of attribute proxies.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def proxies
  @proxies
end

#ready_conditionObject (readonly)

Returns the value of attribute ready_condition.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def ready_condition
  @ready_condition
end

#site_id_conditionObject

Returns the value of attribute site_id_condition.



10
11
12
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 10

def site_id_condition
  @site_id_condition
end

#supervisor_settingsObject (readonly)

Returns the value of attribute supervisor_settings.



8
9
10
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 8

def supervisor_settings
  @supervisor_settings
end

Class Method Details

.build_id_from_ip_port(ip, port) ⇒ Object



107
108
109
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 107

def self.build_id_from_ip_port(ip, port)
  Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end

Instance Method Details

#build_outbound_proxiesObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 70

def build_outbound_proxies
  site_entries = (@supervisor_settings['sites'] || {}).except('default')
  site_entries.each_pair do |site_id, site_settings|
    endpoints = site_settings['supervisors'] || []
    merged_settings = site_id_to_site_setting site_id
    endpoints.each do |endpoint|
      @proxies << SiteProxy.new({
                                  supervisor: self,
                                  task: @task,
                                  settings: @supervisor_settings,
                                  site_id: site_id,
                                  site_settings: merged_settings,
                                  ip: endpoint['ip'],
                                  port: endpoint['port'],
                                  logger: @logger,
                                  archive: @archive,
                                  collect: @collect
                                })
    end
  end
end

#build_proxy(settings) ⇒ Object



103
104
105
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 103

def build_proxy(settings)
  SiteProxy.new settings
end

#client_role?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 24

def client_role?
  @supervisor_settings['connection_role'] == 'client'
end

#connect_to_sitesObject



60
61
62
63
64
65
66
67
68
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 60

def connect_to_sites
  log 'Starting supervisor in client connection role',
      level: :info,
      timestamp: @clock.now
  build_outbound_proxies
  @ready_condition.signal
  @proxies.each(&:start)
  @proxies.each(&:wait)
end

#runObject

listen for connections



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 33

def run
  return connect_to_sites if client_role?

  log "Starting supervisor on port #{@supervisor_settings['port']}",
      level: :info,
      timestamp: @clock.now

  @endpoint = IO::Endpoint.tcp('0.0.0.0', @supervisor_settings['port'])
  @accept_task = Async::Task.current.async do |task|
    task.annotate 'supervisor accept loop'
    @endpoint.accept do |socket| # creates fibers
      handle_connection(socket)
    rescue StandardError => e
      distribute_error e, level: :internal
    end
  rescue Async::Stop
    # Expected during shutdown - no action needed
  rescue StandardError => e
    distribute_error e, level: :internal
  end

  @ready_condition.signal
  @accept_task.wait
rescue StandardError => e
  distribute_error e, level: :internal
end

#server_role?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 28

def server_role?
  !client_role?
end

#site_idObject



20
21
22
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 20

def site_id
  @supervisor_settings['site_id']
end

#stopObject

stop



93
94
95
96
97
98
99
100
101
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 93

def stop
  log "Stopping supervisor #{@supervisor_settings['site_id']}", level: :info

  @accept_task&.stop
  @accept_task = nil

  @endpoint = nil
  super
end