Class: SourceMonitor::Fetching::FetchRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/source_monitor/fetching/fetch_runner.rb

Overview

Coordinates execution of FeedFetcher while ensuring we do not run more than one fetch per-source concurrently. The runner also centralizes the logic for queuing follow-up scraping jobs so both background jobs and manual UI entry points share the same behavior.

Defined Under Namespace

Classes: ConcurrencyError

Constant Summary collapse

LOCK_NAMESPACE =
1_746_219

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source:, fetcher_class: SourceMonitor::Fetching::FeedFetcher, scrape_job_class: SourceMonitor::ScrapeItemJob, scrape_enqueuer_class: SourceMonitor::Scraping::Enqueuer, retention_pruner_class: SourceMonitor::Items::RetentionPruner, lock_factory: SourceMonitor::Fetching::AdvisoryLock, retention_handler: nil, follow_up_handler: nil, event_publisher: nil, force: false) ⇒ FetchRunner

Returns a new instance of FetchRunner.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 21

def initialize(source:, fetcher_class: SourceMonitor::Fetching::FeedFetcher, scrape_job_class: SourceMonitor::ScrapeItemJob, scrape_enqueuer_class: SourceMonitor::Scraping::Enqueuer, retention_pruner_class: SourceMonitor::Items::RetentionPruner, lock_factory: SourceMonitor::Fetching::AdvisoryLock, retention_handler: nil, follow_up_handler: nil, event_publisher: nil, force: false)
  @source = source
  @fetcher_class = fetcher_class
  @force = force
  @lock = lock_factory.new(
    namespace: LOCK_NAMESPACE,
    key: source.id,
    connection_pool: ActiveRecord::Base.connection_pool
  )
  @retention_handler = retention_handler || SourceMonitor::Fetching::Completion::RetentionHandler.new(pruner: retention_pruner_class)
  @follow_up_handler = follow_up_handler || SourceMonitor::Fetching::Completion::FollowUpHandler.new(enqueuer_class: scrape_enqueuer_class, job_class: scrape_job_class)
  @event_publisher = event_publisher || SourceMonitor::Fetching::Completion::EventPublisher.new
  @retry_scheduled = false
end

Instance Attribute Details

#event_publisherObject (readonly)

Returns the value of attribute event_publisher.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def event_publisher
  @event_publisher
end

#fetcher_classObject (readonly)

Returns the value of attribute fetcher_class.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def fetcher_class
  @fetcher_class
end

#follow_up_handlerObject (readonly)

Returns the value of attribute follow_up_handler.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def follow_up_handler
  @follow_up_handler
end

#forceObject (readonly)

Returns the value of attribute force.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def force
  @force
end

#lockObject (readonly)

Returns the value of attribute lock.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def lock
  @lock
end

#retention_handlerObject (readonly)

Returns the value of attribute retention_handler.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def retention_handler
  @retention_handler
end

#sourceObject (readonly)

Returns the value of attribute source.



19
20
21
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19

def source
  @source
end

Class Method Details

.enqueue(source_or_id, force: false) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 40

def self.enqueue(source_or_id, force: false)
  source = resolve_source(source_or_id)
  return unless source

  if force && source.fetch_status == "fetching"
    return :already_fetching
  end

  # Don't broadcast here - controller handles immediate UI update
  source.update_columns(fetch_status: "queued")
  SourceMonitor::FetchFeedJob.perform_later(source.id, force: force)
end

.run(source:, **options) ⇒ Object



36
37
38
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 36

def self.run(source:, **options)
  new(source:, **options).run
end

Instance Method Details

#runObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 53

def run
  return skip_due_to_circuit if circuit_blocked?

  @retry_scheduled = false
  result = nil

  # Phase 1: Acquire advisory lock and mark source as fetching.
  # Uses a DB connection briefly, then releases it.
  lock.acquire!
  begin
    mark_fetching!
  rescue StandardError
    lock.release!
    raise
  end

  # Phase 2: HTTP fetch -- no DB connection held during network I/O.
  # This is the key optimization: on slow feeds (up to 30s timeout),
  # we no longer hold a DB connection idle while waiting for HTTP.
  begin
    result = fetcher_class.new(source: source).call
  rescue StandardError => fetch_error
    # Ensure lock is released before propagating
    lock.release!
    raise fetch_error
  end

  # Phase 3: Post-fetch DB writes under the advisory lock (still held).
  completion_result = completion_result_for(result)
  begin
    log_handler_result("RetentionHandler", retention_handler.call(source:, result: completion_result))
    log_handler_result("FollowUpHandler", follow_up_handler.call(source:, result: completion_result))
    schedule_retry_if_needed(completion_result)
    mark_complete!(completion_result)
  ensure
    lock.release!
  end

  log_handler_result("EventPublisher", event_publisher.call(source:, result: result))
  result
rescue SourceMonitor::Fetching::AdvisoryLock::NotAcquiredError => error
  raise ConcurrencyError, error.message
rescue StandardError => error
  mark_failed!(error)
  event_publisher.call(source:, result: nil)
  raise
ensure
  begin
    source.reload
    source.update_columns(fetch_status: "failed") if source.fetch_status == "fetching"
  rescue StandardError # :nocov:
    nil
  end
end