Class: SourceMonitor::Fetching::FeedFetcher::SourceUpdater

Inherits:
Object
  • Object
show all
Defined in:
lib/source_monitor/fetching/feed_fetcher/source_updater.rb

Constant Summary collapse

CONSECUTIVE_FAILURE_PAUSE_THRESHOLD =
5
ERROR_CATEGORY_MAP =
{
  SourceMonitor::Fetching::TimeoutError => "network",
  SourceMonitor::Fetching::ConnectionError => "network",
  SourceMonitor::Fetching::ParsingError => "parse",
  SourceMonitor::Fetching::BlockedError => "blocked",
  SourceMonitor::Fetching::AuthenticationError => "auth",
  SourceMonitor::Fetching::UnexpectedResponseError => "unknown"
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source:, adaptive_interval:) ⇒ SourceUpdater

Returns a new instance of SourceUpdater.



20
21
22
23
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 20

def initialize(source:, adaptive_interval:)
  @source = source
  @adaptive_interval = adaptive_interval
end

Instance Attribute Details

#adaptive_intervalObject (readonly)

Returns the value of attribute adaptive_interval.



18
19
20
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 18

def adaptive_interval
  @adaptive_interval
end

#sourceObject (readonly)

Returns the value of attribute source.



18
19
20
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 18

def source
  @source
end

Instance Method Details

#create_fetch_log(response:, duration_ms:, started_at:, success:, feed: nil, error: nil, body: nil, feed_signature: nil, items_created: 0, items_updated: 0, items_failed: 0, item_errors: []) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 102

def create_fetch_log(response:, duration_ms:, started_at:, success:, feed: nil, error: nil, body: nil, feed_signature: nil,
                     items_created: 0, items_updated: 0, items_failed: 0, item_errors: [])
  source.fetch_logs.create!(
    success:,
    started_at: started_at,
    completed_at: started_at + (duration_ms / 1000.0),
    duration_ms: duration_ms,
    http_status: response&.status,
    http_response_headers: normalized_headers(response&.headers),
    feed_size_bytes: body&.bytesize,
    items_in_feed: feed&.respond_to?(:entries) ? feed.entries.size : nil,
    items_created: items_created,
    items_updated: items_updated,
    items_failed: items_failed,
    error_class: error&.class&.name,
    error_message: error&.message,
    error_backtrace: error_backtrace(error),
    error_category: categorize_error(error),
    metadata: (feed, error: error, feed_signature: feed_signature, item_errors: item_errors)
  )
end

#elapsed_ms(started_at) ⇒ Object



124
125
126
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 124

def elapsed_ms(started_at)
  ((Time.current - started_at) * 1000.0).round
end

#feed_signature_changed?(feed_signature) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
131
132
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 128

def feed_signature_changed?(feed_signature)
  return false if feed_signature.blank?

  (source. || {}).fetch("last_feed_signature", nil) != feed_signature
end

#parse_http_time(value) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 142

def parse_http_time(value)
  return if value.blank?

  Time.httpdate(value)
rescue ArgumentError
  nil
end

#update_source_for_failure(error, duration_ms) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 82

def update_source_for_failure(error, duration_ms)
  now = Time.current
  attrs = {
    last_fetched_at: now,
    last_fetch_duration_ms: duration_ms,
    last_http_status: error.http_status,
    last_error: error.message,
    last_error_at: now,
    failure_count: source.failure_count.to_i + 1,
    consecutive_fetch_failures: source.consecutive_fetch_failures.to_i + 1
  }

  adaptive_interval.apply_adaptive_interval!(attrs, content_changed: false, failure: true)
  attrs[:metadata] = 
  decision = apply_retry_strategy!(attrs, error, now)
  source.update!(attrs)
  check_consecutive_failure_auto_pause!
  decision
end

#update_source_for_not_modified(response, duration_ms) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 55

def update_source_for_not_modified(response, duration_ms)
  attributes = {
    last_fetched_at: Time.current,
    last_fetch_duration_ms: duration_ms,
    last_http_status: response.status,
    last_error: nil,
    last_error_at: nil,
    failure_count: 0,
    consecutive_fetch_failures: 0
  }

  if (etag = response.headers["etag"] || response.headers["ETag"])
    attributes[:etag] = etag
  end

  if (last_modified_header = response.headers["last-modified"] || response.headers["Last-Modified"])
    parsed_time = parse_http_time(last_modified_header)
    attributes[:last_modified] = parsed_time if parsed_time
  end

  adaptive_interval.apply_adaptive_interval!(attributes, content_changed: false)
  attributes[:metadata] = 
  reset_retry_state!(attributes)
  source.update!(attributes)
  enqueue_favicon_fetch_if_needed
end

#update_source_for_success(response, duration_ms, feed, feed_signature, content_changed: nil, entries_digest: nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 25

def update_source_for_success(response, duration_ms, feed, feed_signature, content_changed: nil, entries_digest: nil)
  attributes = {
    last_fetched_at: Time.current,
    last_fetch_duration_ms: duration_ms,
    last_http_status: response.status,
    last_error: nil,
    last_error_at: nil,
    failure_count: 0,
    consecutive_fetch_failures: 0,
    feed_format: derive_feed_format(feed)
  }

  if (etag = response.headers["etag"] || response.headers["ETag"])
    attributes[:etag] = etag
  end

  if (last_modified_header = response.headers["last-modified"] || response.headers["Last-Modified"])
    parsed_time = parse_http_time(last_modified_header)
    attributes[:last_modified] = parsed_time if parsed_time
  end

  # Use explicit content_changed if provided, otherwise fall back to feed signature comparison
  changed = content_changed.nil? ? feed_signature_changed?(feed_signature) : content_changed
  adaptive_interval.apply_adaptive_interval!(attributes, content_changed: changed)
  attributes[:metadata] = (feed_signature: feed_signature, entries_digest: entries_digest)
  reset_retry_state!(attributes)
  source.update!(attributes)
  enqueue_favicon_fetch_if_needed
end

#updated_metadata(feed_signature: nil, entries_digest: nil) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 134

def (feed_signature: nil, entries_digest: nil)
   = (source. || {}).dup
  .delete("dynamic_fetch_interval_seconds")
  ["last_feed_signature"] = feed_signature if feed_signature.present?
  ["last_entries_digest"] = entries_digest if entries_digest.present?
  
end