Class: GoodJob::MultiScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/multi_scheduler.rb

Overview

Delegates the interface of a single Scheduler to multiple Schedulers.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(schedulers) ⇒ MultiScheduler

Returns a new instance of MultiScheduler.

Parameters:



33
34
35
# File 'lib/good_job/multi_scheduler.rb', line 33

def initialize(schedulers)
  @schedulers = schedulers
end

Instance Attribute Details

#schedulersArray<Scheduler> (readonly)

Returns List of the scheduler delegates.

Returns:

  • (Array<Scheduler>)

    List of the scheduler delegates



30
31
32
# File 'lib/good_job/multi_scheduler.rb', line 30

def schedulers
  @schedulers
end

Class Method Details

.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_on_initialize: false) ⇒ GoodJob::MultiScheduler

Creates MultiScheduler from a GoodJob::Configuration instance.

Parameters:

Returns:



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/good_job/multi_scheduler.rb', line 10

def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_on_initialize: false)
  schedulers = configuration.queue_string.split(';').map(&:strip).map do |queue_string_and_max_threads|
    queue_string, max_threads = queue_string_and_max_threads.split(':').map { |str| str.strip.presence }
    max_threads = (max_threads || configuration.max_threads).to_i

    job_performer = GoodJob::JobPerformer.new(queue_string, capsule: capsule)
    GoodJob::Scheduler.new(
      job_performer,
      max_threads: max_threads,
      max_cache: configuration.max_cache,
      warm_cache_on_initialize: warm_cache_on_initialize,
      cleanup_interval_seconds: configuration.cleanup_interval_seconds,
      cleanup_interval_jobs: configuration.cleanup_interval_jobs
    )
  end

  new(schedulers)
end

Instance Method Details

#create_thread(state = nil) ⇒ Boolean?

Delegates to Scheduler#create_thread.

Parameters:

  • state (Hash) (defaults to: nil)

Returns:

  • (Boolean, nil)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/good_job/multi_scheduler.rb', line 66

def create_thread(state = nil)
  results = []

  if state && !state[:fanout]
    schedulers.any? do |scheduler|
      scheduler.create_thread(state).tap { |result| results << result }
    end
  else
    schedulers.each do |scheduler|
      results << scheduler.create_thread(state)
    end
  end

  if results.any?
    true
  elsif results.any?(false)
    false
  else # rubocop:disable Style/EmptyElse
    nil
  end
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Delegates to Scheduler#restart.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))


59
60
61
# File 'lib/good_job/multi_scheduler.rb', line 59

def restart(timeout: -1)
  GoodJob._shutdown_all(schedulers, :restart, timeout: timeout)
end

#running?Boolean?

Delegates to Scheduler#running?.

Returns:

  • (Boolean, nil)


39
40
41
# File 'lib/good_job/multi_scheduler.rb', line 39

def running?
  schedulers.all?(&:running?)
end

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Delegates to Scheduler#shutdown.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))


52
53
54
# File 'lib/good_job/multi_scheduler.rb', line 52

def shutdown(timeout: -1)
  GoodJob._shutdown_all(schedulers, timeout: timeout)
end

#shutdown?Boolean?

Delegates to Scheduler#shutdown?.

Returns:

  • (Boolean, nil)


45
46
47
# File 'lib/good_job/multi_scheduler.rb', line 45

def shutdown?
  schedulers.all?(&:shutdown?)
end

#statsObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/good_job/multi_scheduler.rb', line 88

def stats
  scheduler_stats = schedulers.map(&:stats)

  {
    schedulers: scheduler_stats,
    empty_executions_count: scheduler_stats.sum { |stats| stats.fetch(:empty_executions_count, 0) },
    errored_executions_count: scheduler_stats.sum { |stats| stats.fetch(:errored_executions_count, 0) },
    succeeded_executions_count: scheduler_stats.sum { |stats| stats.fetch(:succeeded_executions_count, 0) },
    total_executions_count: scheduler_stats.sum { |stats| stats.fetch(:total_executions_count, 0) },
    execution_at: scheduler_stats.map { |stats| stats.fetch(:execution_at, nil) }.compact.max,
    active_execution_thread_count: scheduler_stats.sum { |stats| stats.fetch(:active_threads, 0) },
    check_queue_at: scheduler_stats.map { |stats| stats.fetch(:check_queue_at, nil) }.compact.max,
  }
end