Class: SourceMonitor::Fetching::StalledFetchReconciler

Inherits:
Object
  • Object
show all
Defined in:
lib/source_monitor/fetching/stalled_fetch_reconciler.rb

Defined Under Namespace

Classes: Result

Constant Summary collapse

FAILURE_MESSAGE =
"Fetch job stalled; resetting state and retrying"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(now:, stale_after:) ⇒ StalledFetchReconciler

Returns a new instance of StalledFetchReconciler.



16
17
18
19
# File 'lib/source_monitor/fetching/stalled_fetch_reconciler.rb', line 16

def initialize(now:, stale_after:)
  @now = now
  @stale_after = stale_after
end

Class Method Details

.call(now: Time.current, stale_after: nil) ⇒ Object



12
13
14
# File 'lib/source_monitor/fetching/stalled_fetch_reconciler.rb', line 12

def self.call(now: Time.current, stale_after: nil)
  new(now:, stale_after: stale_after || default_stale_after).call
end

.default_stale_afterObject



45
46
47
48
49
# File 'lib/source_monitor/fetching/stalled_fetch_reconciler.rb', line 45

def self.default_stale_after
  SourceMonitor.config.fetching.stale_timeout_minutes.minutes
rescue NoMethodError
  10.minutes
end

Instance Method Details

#callObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/source_monitor/fetching/stalled_fetch_reconciler.rb', line 21

def call
  recovered_ids = []
  removed_job_ids = []
  jobs_supported = jobs_supported?

  stale_sources.find_each do |source|
    recovery = recover_source(source, jobs_supported:)
    next if recovery.nil?

    recovered_ids << recovery[:source_id] if recovery[:source_id]
    removed_job_ids.concat(Array(recovery[:removed_job_ids]))
  end

  Result.new(
    recovered_source_ids: recovered_ids.uniq,
    jobs_removed: removed_job_ids.uniq,
    executed_at: now
  )
end