Class: RSMP::Node

Inherits:
Object
  • Object
show all
Includes:
Logging, Task
Defined in:
lib/rsmp/node/node.rb

Overview

Base class for sites and supervisors.

Direct Known Subclasses

Site, Supervisor

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Task

#initialize_task, #restart, #run, #start, #stop, #stop_task, #task_status, #wait, #wait_for_condition

Methods included from Logging

#initialize_logging, #log

Constructor Details

#initialize(options = {}) ⇒ Node

Returns a new instance of Node.



9
10
11
12
13
14
15
16
17
# File 'lib/rsmp/node/node.rb', line 9

def initialize(options = {})
  initialize_logging options
  initialize_task
  @deferred = []
  @clock = Clock.new
  @error_queue = Async::Queue.new
  @ignore_errors = []
  @collect = options[:collect]
end

Instance Attribute Details

#archiveObject (readonly)

Returns the value of attribute archive.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def archive
  @archive
end

#clockObject (readonly)

Returns the value of attribute clock.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def clock
  @clock
end

#collectorObject (readonly)

Returns the value of attribute collector.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def collector
  @collector
end

#deferredObject (readonly)

Returns the value of attribute deferred.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def deferred
  @deferred
end

#error_queueObject (readonly)

Returns the value of attribute error_queue.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def error_queue
  @error_queue
end

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def logger
  @logger
end

#taskObject (readonly)

Returns the value of attribute task.



7
8
9
# File 'lib/rsmp/node/node.rb', line 7

def task
  @task
end

Instance Method Details

#authorObject



78
79
80
# File 'lib/rsmp/node/node.rb', line 78

def author
  site_id
end

#check_required_settings(settings, required) ⇒ Object

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
# File 'lib/rsmp/node/node.rb', line 70

def check_required_settings(settings, required)
  raise ArgumentError, 'Settings is empty' unless settings

  required.each do |setting|
    raise ArgumentError, "Missing setting: #{setting}" unless settings.include? setting.to_s
  end
end

#clear_deferredObject



66
67
68
# File 'lib/rsmp/node/node.rb', line 66

def clear_deferred
  @deferred.clear
end

#defer(key, item = nil) ⇒ Object



52
53
54
# File 'lib/rsmp/node/node.rb', line 52

def defer(key, item = nil)
  @deferred << [key, item]
end

#distribute_error(error, options = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/rsmp/node/node.rb', line 42

def distribute_error(error, options = {})
  return if @ignore_errors.find { |klass| error.is_a? klass }

  if options[:level] == :internal
    log ["#{error} in task: #{Async::Task.current}", error.backtrace].flatten.join("\n"),
        level: :error
  end
  @error_queue.enqueue error
end

#do_deferred(key, item = nil) ⇒ Object



64
# File 'lib/rsmp/node/node.rb', line 64

def do_deferred(key, item = nil); end

#ignore_errors(classes) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/rsmp/node/node.rb', line 34

def ignore_errors(classes)
  was = @ignore_errors
  @ignore_errors = [classes].flatten
  yield
ensure
  @ignore_errors = was
end

#inspectObject



19
20
21
# File 'lib/rsmp/node/node.rb', line 19

def inspect
  "#<#{self.class.name}:#{object_id} id: #{site_id}}>"
end

#nowObject



23
24
25
# File 'lib/rsmp/node/node.rb', line 23

def now
  clock.now
end

#process_deferredObject



56
57
58
59
60
61
62
# File 'lib/rsmp/node/node.rb', line 56

def process_deferred
  cloned = @deferred.clone # clone in case do_deferred restarts the current task
  @deferred.clear
  cloned.each do |pair|
    do_deferred pair.first, pair.last
  end
end

#stop_subtasksObject

stop proxies, then call super



28
29
30
31
32
# File 'lib/rsmp/node/node.rb', line 28

def stop_subtasks
  @proxies.each(&:stop)
  @proxies.clear
  super
end