Class: SourceMonitor::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/source_monitor/scheduler.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =

legacy fallback

100
STALE_QUEUE_TIMEOUT =

legacy fallback

10.minutes
ELIGIBLE_FETCH_STATUSES =
%w[idle failed].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit:, now:) ⇒ Scheduler

Returns a new instance of Scheduler.



16
17
18
19
# File 'lib/source_monitor/scheduler.rb', line 16

def initialize(limit:, now:)
  @limit = limit
  @now = now
end

Class Method Details

.run(limit: SourceMonitor.config.fetching.scheduler_batch_size, now: Time.current) ⇒ Object



12
13
14
# File 'lib/source_monitor/scheduler.rb', line 12

def self.run(limit: SourceMonitor.config.fetching.scheduler_batch_size, now: Time.current)
  new(limit:, now:).run
end

Instance Method Details

#runObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/source_monitor/scheduler.rb', line 21

def run
  payload = { limit: limit }
  recovery = SourceMonitor::Fetching::StalledFetchReconciler.call(now:, stale_after: stale_timeout)
  payload[:stalled_recoveries] = recovery.recovered_source_ids.size
  payload[:stalled_jobs_removed] = recovery.jobs_removed.size

  ActiveSupport::Notifications.instrument("source_monitor.scheduler.run", payload) do
    start_monotonic = SourceMonitor::Instrumentation.monotonic_time
    source_ids = lock_due_source_ids
    payload[:enqueued_count] = source_ids.size

    source_ids.each do |source_id|
      SourceMonitor::Fetching::FetchRunner.enqueue(source_id)
    end

    payload[:duration_ms] = ((SourceMonitor::Instrumentation.monotonic_time - start_monotonic) * 1000.0).round(2)

    source_ids.size
  end
rescue StandardError => error
  Rails.logger.warn(
    "[SourceMonitor::Scheduler] Scheduler run failed: #{error.class} - #{error.message}"
  ) if defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger
  0
end