Class: Cuboid::RPC::Server::Instance

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/instance.rb,
lib/cuboid/rpc/server/instance/peers.rb,
lib/cuboid/rpc/server/instance/service.rb

Overview

Author:

  • Tasos “Zapotek” Laskos <tasos.laskos@gmail.com>

Defined Under Namespace

Modules: Service Classes: Peers

Constant Summary collapse

SHUTDOWN_GRACE_SECONDS =

Makes the server go bye-bye…Lights out!

‘shutdown` must reliably take the Ruby process with it. Stopping the reactor + RPC server alone leaves the Application’s non-daemon threads (audit workers, browser cluster manager, etc.) blocking the runtime — historically this leaked engine subprocesses every time ‘kill_instance` was called over MCP, and showed up in the cuboid spec suite as leftover ruby processes after the run. The `instance.shutdown` RPC returned success but the daemonised process never actually exited.

Two-stage exit:

1. Raise SystemExit on the **main thread** so the at_exit
   chain runs (Cuboid_<pid> tmpdir cleanup, live-plugin's
   `exited` push). SystemExit raised on a non-main thread
   only kills that thread — must hit the main one.
2. Watchdog SIGKILL after a grace window in case a
   non-daemon Application thread refuses to release. The
   Paths boot-sweep reaps the orphaned tmpdir on the next
   cuboid process launch even when at_exit didn't run.
5.0

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Constructor Details

#initialize(options, token) ⇒ Instance

Initializes the RPC interface and the framework.

Parameters:

  • options (Options)
  • token (String)

    Authentication token.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/cuboid/rpc/server/instance.rb', line 45

def initialize( options, token )
    @options = options
    @token   = token

    @application    = Server::ApplicationWrapper.new(
      Cuboid::Application.application
    )
    @active_options = Server::ActiveOptions.new

    @server = Base.new( @options.rpc.to_server_options, token )

    if @options.datastore.log_level
        @server.logger.level = @options.datastore.log_level
    end

    @options.datastore.token = token

    if @options.output.reroute_to_logfile
        reroute_to_file "#{@options.paths.logs}Instance-#{Process.pid}-#{@options.rpc.server_port}.log"
    else
        reroute_to_file false
    end

    set_error_logfile "#{@options.paths.logs}Instance-#{Process.pid}-#{@options.rpc.server_port}.error.log"

    set_handlers( @server )

    # trap interrupts and exit cleanly when required
    %w(QUIT INT).each do |signal|
        next if !Signal.list.has_key?( signal )
        trap( signal ){ shutdown if !@options.datastore.do_not_trap }
    end

    @raktr = Raktr.new
    @raktr.run do
        _run
    end
end

Class Method Details

.parse_block_names(raw) ⇒ Object



284
285
286
287
# File 'lib/cuboid/rpc/server/instance.rb', line 284

def self.parse_block_names( raw )
    return [] if raw.nil?
    Array( raw ).flatten.compact.map(&:to_sym)
end

Instance Method Details

#abort_and_generate_reportHash

Cleans up and returns the report.

Returns:

See Also:

  • #report


137
138
139
140
# File 'lib/cuboid/rpc/server/instance.rb', line 137

def abort_and_generate_report
    @application.abort!
    generate_report
end

#agent_urlString?

Returns Agent URL that provided this Instance, ‘nil` if not provided by a Agent.

Returns:

  • (String, nil)

    Agent URL that provided this Instance, ‘nil` if not provided by a Agent.



97
98
99
# File 'lib/cuboid/rpc/server/instance.rb', line 97

def agent_url
    @options.agent.url
end

#alive?true

Returns:

  • (true)


122
123
124
# File 'lib/cuboid/rpc/server/instance.rb', line 122

def alive?
    @server.alive?
end

#applicationObject



84
85
86
# File 'lib/cuboid/rpc/server/instance.rb', line 84

def application
    Application.application.to_s
end

#busy?Bool

Returns ‘true` if the scan is initializing or running, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the scan is initializing or running, `false` otherwise.



128
129
130
# File 'lib/cuboid/rpc/server/instance.rb', line 128

def busy?
    @run_initializing || @application.busy?
end

#consumed_pidsObject



280
281
282
# File 'lib/cuboid/rpc/server/instance.rb', line 280

def consumed_pids
    [Process.pid]
end

#error_test(str) ⇒ Object



275
276
277
# File 'lib/cuboid/rpc/server/instance.rb', line 275

def error_test( str )
    @application.error_test( str )
end

#errors(starting_line = 0) ⇒ Object



270
271
272
# File 'lib/cuboid/rpc/server/instance.rb', line 270

def errors( starting_line = 0 )
    @application.errors( starting_line )
end

#generate_reportHash



144
145
146
# File 'lib/cuboid/rpc/server/instance.rb', line 144

def generate_report
    @application.generate_report.to_rpc_data
end

#progress(options = {}) ⇒ Hash

# Recommended usage

Please request from the method only the things you are going to actually
use, otherwise you'll just be wasting bandwidth.
In addition, ask to **not** be served data you already have, like
error messages.

Pass a `session:` token (any caller-chosen string) and the
server returns only error lines past the previous offset
under that token. Reuse the same token across polls for
the same logical view; pick a fresh one to start fresh.

  token = SecureRandom.uuid
  while sleep 1
      errors = instance.progress( session: token )[:errors]
      puts errors.join( "\n" )
  end

Without `session`, callers must opt into errors via
`with: [:errors]` and will receive the full set every poll.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :session (String, Symbol)

    Caller-chosen session token. When provided, the response carries only errors past the previously emitted offset.

  • :with (Array<Symbol>)

    Block names to include when no session is in use. Currently only ‘:errors` is delta-able.

  • :without (Array<Symbol>)

    Block names to exclude. One or more of ‘:statistics`, `:errors`. Takes precedence over `with:` and over the session-on-by-default blocks.

Returns:

  • (Hash)
    • ‘statistics` – General runtime statistics (merged when part of Grid)

      (enabled by default)
      
    • ‘status` – #status

    • ‘busy` – #busy?

    • ‘errors` – #errors



187
188
189
# File 'lib/cuboid/rpc/server/instance.rb', line 187

def progress( options = {} )
    progress_handler( options.merge( as_hash: true ) )
end

#restore!(snapshot) ⇒ Object

See Also:

  • #suspend
  • #snapshot_path


106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/cuboid/rpc/server/instance.rb', line 106

def restore!( snapshot )
    # If the instance isn't clean bail out now.
    return false if busy? || @called

    @called = @run_initializing = true

    Thread.new do
        @application.restore!( snapshot )
        @application.run
        @run_initializing = false
    end

    true
end

#run(options = nil) ⇒ Object

Configures and runs a job.



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/cuboid/rpc/server/instance.rb', line 192

def run( options = nil )
    # If the instance isn't clean bail out now.
    return false if busy? || @called

    if !@application.valid_options?( options )
        fail ArgumentError, 'Invalid options!'
    end

    # There may be follow-up/retry calls by the client in cases of network
    # errors (after the request has reached us) so we need to keep minimal
    # track of state in order to bail out on subsequent calls.
    @called = @run_initializing = true

    @active_options.set( application: options )

    Thread.new do
        @application.run
        @run_initializing = false
    end

    true
end

#scheduler_urlString?

Returns Scheduler URL to which this Instance is attached, ‘nil` if not attached.

Returns:

  • (String, nil)

    Scheduler URL to which this Instance is attached, ‘nil` if not attached.



90
91
92
# File 'lib/cuboid/rpc/server/instance.rb', line 90

def scheduler_url
    @options.scheduler.url
end

#shutdown(&block) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/cuboid/rpc/server/instance.rb', line 237

def shutdown( &block )
    if @shutdown
        block.call if block_given?
        return
    end
    @shutdown = true

    print_status 'Shutting down...'

    @application.shutdown

    # We're shutting down services so we need to use a concurrent way but
    # without going through the Reactor.
    Thread.new do
        @server.shutdown
        @raktr.stop
        block.call true if block_given?

        # Stage 1 — graceful: SystemExit on the main thread so
        # at_exit handlers run.
        main = Thread.main
        if main && main.alive? && main != Thread.current
            main.raise( SystemExit.new( 0 ) ) rescue nil
        end

        # Stage 2 — watchdog: hammer if main can't unwind.
        sleep SHUTDOWN_GRACE_SECONDS
        Process.kill( 'KILL', Process.pid ) rescue nil
    end

    true
end