Class: SourceMonitor::Scraping::Enqueuer

Inherits:
Object
  • Object
show all
Defined in:
lib/source_monitor/scraping/enqueuer.rb

Overview

Coordinates queuing of scraping jobs while respecting source configuration and avoiding duplicate enqueues for the same item.

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item:, source: nil, job_class: SourceMonitor::ScrapeItemJob, reason: :manual) ⇒ Enqueuer

Returns a new instance of Enqueuer.



32
33
34
35
36
37
# File 'lib/source_monitor/scraping/enqueuer.rb', line 32

def initialize(item:, source: nil, job_class: SourceMonitor::ScrapeItemJob, reason: :manual)
  @item = item
  @source = source || item&.source
  @job_class = job_class
  @reason = reason.to_sym
end

Instance Attribute Details

#itemObject (readonly)

Returns the value of attribute item.



26
27
28
# File 'lib/source_monitor/scraping/enqueuer.rb', line 26

def item
  @item
end

#job_classObject (readonly)

Returns the value of attribute job_class.



26
27
28
# File 'lib/source_monitor/scraping/enqueuer.rb', line 26

def job_class
  @job_class
end

#reasonObject (readonly)

Returns the value of attribute reason.



26
27
28
# File 'lib/source_monitor/scraping/enqueuer.rb', line 26

def reason
  @reason
end

#sourceObject (readonly)

Returns the value of attribute source.



26
27
28
# File 'lib/source_monitor/scraping/enqueuer.rb', line 26

def source
  @source
end

Class Method Details

.enqueue(item:, source: nil, job_class: SourceMonitor::ScrapeItemJob, reason: :manual) ⇒ Object



28
29
30
# File 'lib/source_monitor/scraping/enqueuer.rb', line 28

def self.enqueue(item:, source: nil, job_class: SourceMonitor::ScrapeItemJob, reason: :manual)
  new(item:, source:, job_class:, reason:).enqueue
end

Instance Method Details

#enqueueObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/source_monitor/scraping/enqueuer.rb', line 39

def enqueue
  log("enqueue:start", item:, source:, reason: reason)
  return failure(:missing_item, "Item could not be found.") unless item
  return failure(:missing_source, "Item must belong to a source.") unless source
  return failure(:scraping_disabled, "Scraping is disabled for this source.") unless source.scraping_enabled?
  if auto_reason? && !source.auto_scrape?
    return failure(:auto_scrape_disabled, "Automatic scraping is disabled for this source.")
  end

  already_queued = false
  rate_limited = false
  rate_limit_info = nil
  time_limited = false
  time_limit_info = nil

  item.with_lock do
    item.reload

    if SourceMonitor::Scraping::State.in_flight?(item.scrape_status)
      log("enqueue:in_flight", item:, status: item.scrape_status)
      already_queued = true
      next
    end

    exhausted, info = rate_limit_exhausted?
    if exhausted
      rate_limited = true
      rate_limit_info = info
      next
    end

    limited, t_info = time_rate_limited?
    if limited
      time_limited = true
      time_limit_info = t_info
      next
    end

    SourceMonitor::Scraping::State.mark_pending!(item, broadcast: false, lock: false)
  end

  if already_queued
    log("enqueue:already_enqueued", item:, status: item.scrape_status)
    return Result.new(status: :already_enqueued, message: "Scrape already in progress for this item.", item: item)
  end

  if rate_limited
    message = rate_limit_message(rate_limit_info)
    log("enqueue:rate_limited", item:, limit: rate_limit_info&.fetch(:limit, nil), in_flight: rate_limit_info&.fetch(:in_flight, nil))
    return Result.new(status: :rate_limited, message:, item: item)
  end

  if time_limited
    wait_seconds = time_limit_info[:wait_seconds]
    job_class.set(wait: wait_seconds.seconds).perform_later(item.id)
    message = "Scrape deferred: source was scraped #{time_limit_info[:interval]}s ago, re-enqueued with #{wait_seconds}s delay."
    log("enqueue:deferred", item:, wait_seconds:, interval: time_limit_info[:interval])
    return Result.new(status: :deferred, message:, item: item)
  end

  job_class.perform_later(item.id)
  log("enqueue:job_enqueued", item:, job_class: job_class.name)
  Result.new(status: :enqueued, message: "Scrape has been enqueued for processing.", item: item)
end