Class: Cuboid::RPC::Server::Scheduler

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/scheduler.rb

Overview

RPC scheduler service which:

In addition to the purely queue functionality, it also allows for running Instances to be:

  • Detached from the queue monitor and transfer the management responsibility to the client.

  • Attached to the queue monitor and transfer the management responsibility to the queue.

If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.

Author:

  • Tasos “Zapotek” Laskos <tasos.laskos@gmail.com>

Constant Summary collapse

TICK_CONSUME =
0.1
RPC_ERROR_TOLERANCE =

Number of CONSECUTIVE RPC errors a running Instance is allowed to accumulate before it’s given up on and marked #failed. A single transient blip (the Instance momentarily unreachable on one ping, an engine reconnect, etc.) shouldn’t fail an otherwise-healthy scan; any clean response in between resets the count.

5

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/cuboid/rpc/server/scheduler.rb', line 55

def initialize
    @options = Options.instance

    @options.snapshot.path ||= @options.paths.snapshots
    @options.report.path   ||= @options.paths.reports

    @server = Base.new( @options.rpc.to_server_options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    Options.scheduler.url = @url = @server.url

    prep_logging

    @queue          = {}
    @id_to_priority = {}
    @by_priority    = {}

    @running           = {}
    @completed         = {}
    @failed            = {}
    # Per-Instance consecutive RPC-error counters (see RPC_ERROR_TOLERANCE).
    @rpc_error_counts  = Hash.new( 0 )

    set_handlers( @server )
    trap_interrupts { Thread.new { shutdown } }

    monitor_instances
    consume_queue

    run
end

Instance Method Details

#alive?TrueClass

Returns:

  • (TrueClass)


273
274
275
# File 'lib/cuboid/rpc/server/scheduler.rb', line 273

def alive?
    @server.alive?
end

#any?Bool

Returns:

  • (Bool)


93
94
95
# File 'lib/cuboid/rpc/server/scheduler.rb', line 93

def any?
    !empty?
end

#attach(url, token, &block) ⇒ String, ...

Attaches a running Instance to the queue monitor.

Parameters:

  • url (String)

    Instance URL for a running Instance.

  • token (String)

    Authentication token for the Instance.

Returns:

  • (String, false, nil)
    • Instance ID for further queue reference.

    • ‘false` if the Instance is already attached to a Scheduler.

    • ‘nil` if the Instance could not be reached.



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/cuboid/rpc/server/scheduler.rb', line 214

def attach( url, token, &block )
    client = connect_to_instance( url, token )
    client.alive? do |bool|
        if bool.rpc_exception?
            block.call
            next
        end

        client.scheduler_url do |scheduler_url|
            if scheduler_url
                block.call false
                next
            end

            client.options.set( scheduler: { url: @options.scheduler.url } ) do
                @running[token] = client
                block.call token
            end
        end
    end
end

#clearObject

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Empties the queue.



240
241
242
243
244
245
246
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240

def clear
    @queue.clear
    @by_priority.clear
    @id_to_priority.clear

    nil
end

#completedHash

Returns Completed Instances and their report location.

Returns:

  • (Hash)

    Completed Instances and their report location.



119
120
121
# File 'lib/cuboid/rpc/server/scheduler.rb', line 119

def completed
    @completed
end

#detach(id, &block) ⇒ Hash?

Returns * RPC connection information for the Instance.

  • ‘nil` if no running Instance with that ID is found.

Parameters:

  • id (String)

    Running Instance to detach from the queue monitor.

    Once a Instance is detached it becomes someone else’s responsibility to monitor, manage and shutdown to free its slot.

Returns:



194
195
196
197
198
199
200
201
# File 'lib/cuboid/rpc/server/scheduler.rb', line 194

def detach( id, &block )
    client = @running.delete( id )
    return block.call if !client

    client.options.set( scheduler: { url: nil } ) do
        block.call( url: client.url, token: client.token, pid: client.pid )
    end
end

#empty?Bool

Returns:

  • (Bool)


88
89
90
# File 'lib/cuboid/rpc/server/scheduler.rb', line 88

def empty?
    self.size == 0
end

#errors(starting_line = 0) ⇒ Array<String>

Parameters:

  • starting_line (Integer) (defaults to: 0)

    Sets the starting line for the range of errors to return.

Returns:



260
261
262
263
264
265
266
267
268
269
270
# File 'lib/cuboid/rpc/server/scheduler.rb', line 260

def errors( starting_line = 0 )
    return [] if self.error_buffer.empty?

    error_strings = self.error_buffer

    if starting_line != 0
        error_strings = error_strings[starting_line..-1]
    end

    error_strings
end

#failedHash

Returns Failed Instances and the associated error.

Returns:

  • (Hash)

    Failed Instances and the associated error.



125
126
127
# File 'lib/cuboid/rpc/server/scheduler.rb', line 125

def failed
    @failed
end

#get(id) ⇒ Hash?

Note:

Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Returns * Instance options and priority.

  • ‘nil` if a Instance with the given ID could not be found.

Parameters:

  • id (String)

    ID for a queued Instance.

Returns:

  • (Hash, nil)
    • Instance options and priority.

    • ‘nil` if a Instance with the given ID could not be found.



138
139
140
141
142
143
144
145
# File 'lib/cuboid/rpc/server/scheduler.rb', line 138

def get( id )
    return if !@queue.include? id

    {
        options:  @queue[id],
        priority: @id_to_priority[id]
    }
end

#listHash<Integer,Array>

Returns Queued Instances grouped and sorted by priority.

Returns:

  • (Hash<Integer,Array>)

    Queued Instances grouped and sorted by priority.



104
105
106
# File 'lib/cuboid/rpc/server/scheduler.rb', line 104

def list
    @by_priority
end

#push(options, queue_options = {}) ⇒ String

Returns Instance ID used to reference the Instance from then on.

Parameters:

  • options (Hash)

    Instance options with an extra ‘priority` option which defaults to `0` (higher is more urgent).

Returns:

  • (String)

    Instance ID used to reference the Instance from then on.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/cuboid/rpc/server/scheduler.rb', line 153

def push( options, queue_options = {} )
    priority = queue_options.delete('priority') || 0

    if !Cuboid::Application.application.valid_options?( options )
        fail ArgumentError, 'Invalid options!'
    end

    id = Utilities.generate_token

    @queue[id]          = options
    @id_to_priority[id] = priority

    (@by_priority[priority] ||= []) << id
    @by_priority = Hash[@by_priority.sort_by { |k, _| -k }]

    id
end

#remove(id) ⇒ Object

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Parameters:

  • id (String)

    Instance ID to remove from the queue.



176
177
178
179
180
181
182
183
# File 'lib/cuboid/rpc/server/scheduler.rb', line 176

def remove( id )
    return false if !@queue.include? id

    @queue.delete( id )
    @by_priority[@id_to_priority.delete( id )].delete( id )

    true
end

#runningHash

Returns RPC connection information on running Instances.

Returns:



110
111
112
113
114
115
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110

def running
    @running.inject( {} ) do |h, (id, client)|
        h.merge! id => { url: client.url, token: client.token, pid: client.pid }
        h
    end
end

#shutdownObject

Shuts down the service.



249
250
251
252
253
254
# File 'lib/cuboid/rpc/server/scheduler.rb', line 249

def shutdown
    print_status 'Shutting down...'
    reactor.delay 2 do
        reactor.stop
    end
end

#sizeInteger

Returns:

  • (Integer)


98
99
100
# File 'lib/cuboid/rpc/server/scheduler.rb', line 98

def size
    @queue.size
end