Class: SourceMonitor::Fetching::FetchRunner
- Inherits:
-
Object
- Object
- SourceMonitor::Fetching::FetchRunner
- Defined in:
- lib/source_monitor/fetching/fetch_runner.rb
Overview
Coordinates execution of FeedFetcher while ensuring we do not run more than one fetch per-source concurrently. The runner also centralizes the logic for queuing follow-up scraping jobs so both background jobs and manual UI entry points share the same behavior.
Defined Under Namespace
Classes: ConcurrencyError
Constant Summary collapse
- LOCK_NAMESPACE =
1_746_219
Instance Attribute Summary collapse
-
#event_publisher ⇒ Object
readonly
Returns the value of attribute event_publisher.
-
#fetcher_class ⇒ Object
readonly
Returns the value of attribute fetcher_class.
-
#follow_up_handler ⇒ Object
readonly
Returns the value of attribute follow_up_handler.
-
#force ⇒ Object
readonly
Returns the value of attribute force.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
-
#retention_handler ⇒ Object
readonly
Returns the value of attribute retention_handler.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(source:, fetcher_class: SourceMonitor::Fetching::FeedFetcher, scrape_job_class: SourceMonitor::ScrapeItemJob, scrape_enqueuer_class: SourceMonitor::Scraping::Enqueuer, retention_pruner_class: SourceMonitor::Items::RetentionPruner, lock_factory: SourceMonitor::Fetching::AdvisoryLock, retention_handler: nil, follow_up_handler: nil, event_publisher: nil, force: false) ⇒ FetchRunner
constructor
A new instance of FetchRunner.
- #run ⇒ Object
Constructor Details
#initialize(source:, fetcher_class: SourceMonitor::Fetching::FeedFetcher, scrape_job_class: SourceMonitor::ScrapeItemJob, scrape_enqueuer_class: SourceMonitor::Scraping::Enqueuer, retention_pruner_class: SourceMonitor::Items::RetentionPruner, lock_factory: SourceMonitor::Fetching::AdvisoryLock, retention_handler: nil, follow_up_handler: nil, event_publisher: nil, force: false) ⇒ FetchRunner
Returns a new instance of FetchRunner.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 21 def initialize(source:, fetcher_class: SourceMonitor::Fetching::FeedFetcher, scrape_job_class: SourceMonitor::ScrapeItemJob, scrape_enqueuer_class: SourceMonitor::Scraping::Enqueuer, retention_pruner_class: SourceMonitor::Items::RetentionPruner, lock_factory: SourceMonitor::Fetching::AdvisoryLock, retention_handler: nil, follow_up_handler: nil, event_publisher: nil, force: false) @source = source @fetcher_class = fetcher_class @force = force @lock = lock_factory.new( namespace: LOCK_NAMESPACE, key: source.id, connection_pool: ActiveRecord::Base.connection_pool ) @retention_handler = retention_handler || SourceMonitor::Fetching::Completion::RetentionHandler.new(pruner: retention_pruner_class) @follow_up_handler = follow_up_handler || SourceMonitor::Fetching::Completion::FollowUpHandler.new(enqueuer_class: scrape_enqueuer_class, job_class: scrape_job_class) @event_publisher = event_publisher || SourceMonitor::Fetching::Completion::EventPublisher.new @retry_scheduled = false end |
Instance Attribute Details
#event_publisher ⇒ Object (readonly)
Returns the value of attribute event_publisher.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def event_publisher @event_publisher end |
#fetcher_class ⇒ Object (readonly)
Returns the value of attribute fetcher_class.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def fetcher_class @fetcher_class end |
#follow_up_handler ⇒ Object (readonly)
Returns the value of attribute follow_up_handler.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def follow_up_handler @follow_up_handler end |
#force ⇒ Object (readonly)
Returns the value of attribute force.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def force @force end |
#lock ⇒ Object (readonly)
Returns the value of attribute lock.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def lock @lock end |
#retention_handler ⇒ Object (readonly)
Returns the value of attribute retention_handler.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def retention_handler @retention_handler end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
19 20 21 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 19 def source @source end |
Class Method Details
.enqueue(source_or_id, force: false) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 40 def self.enqueue(source_or_id, force: false) source = resolve_source(source_or_id) return unless source if force && source.fetch_status == "fetching" return :already_fetching end # Don't broadcast here - controller handles immediate UI update source.update_columns(fetch_status: "queued") SourceMonitor::FetchFeedJob.perform_later(source.id, force: force) end |
.run(source:, **options) ⇒ Object
36 37 38 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 36 def self.run(source:, **) new(source:, **).run end |
Instance Method Details
#run ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/source_monitor/fetching/fetch_runner.rb', line 53 def run return skip_due_to_circuit if circuit_blocked? @retry_scheduled = false result = nil # Phase 1: Acquire advisory lock and mark source as fetching. # Uses a DB connection briefly, then releases it. lock.acquire! begin mark_fetching! rescue StandardError lock.release! raise end # Phase 2: HTTP fetch -- no DB connection held during network I/O. # This is the key optimization: on slow feeds (up to 30s timeout), # we no longer hold a DB connection idle while waiting for HTTP. begin result = fetcher_class.new(source: source).call rescue StandardError => fetch_error # Ensure lock is released before propagating lock.release! raise fetch_error end # Phase 3: Post-fetch DB writes under the advisory lock (still held). completion_result = completion_result_for(result) begin log_handler_result("RetentionHandler", retention_handler.call(source:, result: completion_result)) log_handler_result("FollowUpHandler", follow_up_handler.call(source:, result: completion_result)) schedule_retry_if_needed(completion_result) mark_complete!(completion_result) ensure lock.release! end log_handler_result("EventPublisher", event_publisher.call(source:, result: result)) result rescue SourceMonitor::Fetching::AdvisoryLock::NotAcquiredError => error raise ConcurrencyError, error. rescue StandardError => error mark_failed!(error) event_publisher.call(source:, result: nil) raise ensure begin source.reload source.update_columns(fetch_status: "failed") if source.fetch_status == "fetching" rescue StandardError # :nocov: nil end end |