Class: Cuboid::RPC::Server::Scheduler
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/scheduler.rb
Overview
RPC scheduler service which:
-
Maintains a priority queue of Instance jobs.
-
Runs them once a slot is available – determined by system utilization.
-
Monitors #running Instances, retrieves and stores their reports and shuts down their Instance to free its slot.
-
Makes available information on #completed and #failed Instances.
In addition to the purely queue functionality, it also allows for running Instances to be:
-
Detached from the queue monitor and transfer the management responsibility to the client.
-
Attached to the queue monitor and transfer the management responsibility to the queue.
If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.
Constant Summary collapse
- TICK_CONSUME =
0.1- RPC_ERROR_TOLERANCE =
Number of CONSECUTIVE RPC errors a running Instance is allowed to accumulate before it’s given up on and marked #failed. A single transient blip (the Instance momentarily unreachable on one ping, an engine reconnect, etc.) shouldn’t fail an otherwise-healthy scan; any clean response in between resets the count.
5
Instance Method Summary collapse
- #alive? ⇒ TrueClass
- #any? ⇒ Bool
-
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
-
#clear ⇒ Object
Empties the queue.
-
#completed ⇒ Hash
Completed Instances and their report location.
-
#detach(id, &block) ⇒ Hash?
-
RPC connection information for the Instance.
-
- #empty? ⇒ Bool
- #errors(starting_line = 0) ⇒ Array<String>
-
#failed ⇒ Hash
Failed Instances and the associated error.
-
#get(id) ⇒ Hash?
-
Instance options and priority.
-
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#list ⇒ Hash<Integer,Array>
Queued Instances grouped and sorted by priority.
-
#push(options, queue_options = {}) ⇒ String
Instance ID used to reference the Instance from then on.
- #remove(id) ⇒ Object
-
#running ⇒ Hash
RPC connection information on running Instances.
-
#shutdown ⇒ Object
Shuts down the service.
- #size ⇒ Integer
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 55 def initialize @options = Options.instance @options.snapshot.path ||= @options.paths.snapshots @options.report.path ||= @options.paths.reports @server = Base.new( @options.rpc. ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level Options.scheduler.url = @url = @server.url prep_logging @queue = {} @id_to_priority = {} @by_priority = {} @running = {} @completed = {} @failed = {} # Per-Instance consecutive RPC-error counters (see RPC_ERROR_TOLERANCE). @rpc_error_counts = Hash.new( 0 ) set_handlers( @server ) trap_interrupts { Thread.new { shutdown } } monitor_instances consume_queue run end |
Instance Method Details
#alive? ⇒ TrueClass
273 274 275 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 273 def alive? @server.alive? end |
#any? ⇒ Bool
93 94 95 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 93 def any? !empty? end |
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 214 def attach( url, token, &block ) client = connect_to_instance( url, token ) client.alive? do |bool| if bool.rpc_exception? block.call next end client.scheduler_url do |scheduler_url| if scheduler_url block.call false next end client..set( scheduler: { url: @options.scheduler.url } ) do @running[token] = client block.call token end end end end |
#clear ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Empties the queue.
240 241 242 243 244 245 246 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240 def clear @queue.clear @by_priority.clear @id_to_priority.clear nil end |
#completed ⇒ Hash
Returns Completed Instances and their report location.
119 120 121 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 119 def completed @completed end |
#detach(id, &block) ⇒ Hash?
Returns * RPC connection information for the Instance.
-
‘nil` if no running Instance with that ID is found.
194 195 196 197 198 199 200 201 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 194 def detach( id, &block ) client = @running.delete( id ) return block.call if !client client..set( scheduler: { url: nil } ) do block.call( url: client.url, token: client.token, pid: client.pid ) end end |
#empty? ⇒ Bool
88 89 90 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 88 def empty? self.size == 0 end |
#errors(starting_line = 0) ⇒ Array<String>
260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 260 def errors( starting_line = 0 ) return [] if self.error_buffer.empty? error_strings = self.error_buffer if starting_line != 0 error_strings = error_strings[starting_line..-1] end error_strings end |
#failed ⇒ Hash
Returns Failed Instances and the associated error.
125 126 127 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 125 def failed @failed end |
#get(id) ⇒ Hash?
Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Returns * Instance options and priority.
-
‘nil` if a Instance with the given ID could not be found.
138 139 140 141 142 143 144 145 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 138 def get( id ) return if !@queue.include? id { options: @queue[id], priority: @id_to_priority[id] } end |
#list ⇒ Hash<Integer,Array>
Returns Queued Instances grouped and sorted by priority.
104 105 106 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 104 def list @by_priority end |
#push(options, queue_options = {}) ⇒ String
Returns Instance ID used to reference the Instance from then on.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 153 def push( , = {} ) priority = .delete('priority') || 0 if !Cuboid::Application.application.( ) fail ArgumentError, 'Invalid options!' end id = Utilities.generate_token @queue[id] = @id_to_priority[id] = priority (@by_priority[priority] ||= []) << id @by_priority = Hash[@by_priority.sort_by { |k, _| -k }] id end |
#remove(id) ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
176 177 178 179 180 181 182 183 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 176 def remove( id ) return false if !@queue.include? id @queue.delete( id ) @by_priority[@id_to_priority.delete( id )].delete( id ) true end |
#running ⇒ Hash
Returns RPC connection information on running Instances.
110 111 112 113 114 115 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110 def running @running.inject( {} ) do |h, (id, client)| h.merge! id => { url: client.url, token: client.token, pid: client.pid } h end end |
#shutdown ⇒ Object
Shuts down the service.
249 250 251 252 253 254 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 249 def shutdown print_status 'Shutting down...' reactor.delay 2 do reactor.stop end end |
#size ⇒ Integer
98 99 100 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 98 def size @queue.size end |