Class: SourceMonitor::Fetching::FeedFetcher::SourceUpdater
- Inherits:
-
Object
- Object
- SourceMonitor::Fetching::FeedFetcher::SourceUpdater
- Defined in:
- lib/source_monitor/fetching/feed_fetcher/source_updater.rb
Constant Summary collapse
- CONSECUTIVE_FAILURE_PAUSE_THRESHOLD =
5- ERROR_CATEGORY_MAP =
{ SourceMonitor::Fetching::TimeoutError => "network", SourceMonitor::Fetching::ConnectionError => "network", SourceMonitor::Fetching::ParsingError => "parse", SourceMonitor::Fetching::BlockedError => "blocked", SourceMonitor::Fetching::AuthenticationError => "auth", SourceMonitor::Fetching::UnexpectedResponseError => "unknown" }.freeze
Instance Attribute Summary collapse
-
#adaptive_interval ⇒ Object
readonly
Returns the value of attribute adaptive_interval.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
Instance Method Summary collapse
- #create_fetch_log(response:, duration_ms:, started_at:, success:, feed: nil, error: nil, body: nil, feed_signature: nil, items_created: 0, items_updated: 0, items_failed: 0, item_errors: []) ⇒ Object
- #elapsed_ms(started_at) ⇒ Object
- #feed_signature_changed?(feed_signature) ⇒ Boolean
-
#initialize(source:, adaptive_interval:) ⇒ SourceUpdater
constructor
A new instance of SourceUpdater.
- #parse_http_time(value) ⇒ Object
- #update_source_for_failure(error, duration_ms) ⇒ Object
- #update_source_for_not_modified(response, duration_ms) ⇒ Object
- #update_source_for_success(response, duration_ms, feed, feed_signature, content_changed: nil, entries_digest: nil) ⇒ Object
- #updated_metadata(feed_signature: nil, entries_digest: nil) ⇒ Object
Constructor Details
#initialize(source:, adaptive_interval:) ⇒ SourceUpdater
Returns a new instance of SourceUpdater.
20 21 22 23 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 20 def initialize(source:, adaptive_interval:) @source = source @adaptive_interval = adaptive_interval end |
Instance Attribute Details
#adaptive_interval ⇒ Object (readonly)
Returns the value of attribute adaptive_interval.
18 19 20 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 18 def adaptive_interval @adaptive_interval end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
18 19 20 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 18 def source @source end |
Instance Method Details
#create_fetch_log(response:, duration_ms:, started_at:, success:, feed: nil, error: nil, body: nil, feed_signature: nil, items_created: 0, items_updated: 0, items_failed: 0, item_errors: []) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 102 def create_fetch_log(response:, duration_ms:, started_at:, success:, feed: nil, error: nil, body: nil, feed_signature: nil, items_created: 0, items_updated: 0, items_failed: 0, item_errors: []) source.fetch_logs.create!( success:, started_at: started_at, completed_at: started_at + (duration_ms / 1000.0), duration_ms: duration_ms, http_status: response&.status, http_response_headers: normalized_headers(response&.headers), feed_size_bytes: body&.bytesize, items_in_feed: feed&.respond_to?(:entries) ? feed.entries.size : nil, items_created: items_created, items_updated: items_updated, items_failed: items_failed, error_class: error&.class&.name, error_message: error&., error_backtrace: error_backtrace(error), error_category: categorize_error(error), metadata: (feed, error: error, feed_signature: feed_signature, item_errors: item_errors) ) end |
#elapsed_ms(started_at) ⇒ Object
124 125 126 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 124 def elapsed_ms(started_at) ((Time.current - started_at) * 1000.0).round end |
#feed_signature_changed?(feed_signature) ⇒ Boolean
128 129 130 131 132 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 128 def feed_signature_changed?(feed_signature) return false if feed_signature.blank? (source. || {}).fetch("last_feed_signature", nil) != feed_signature end |
#parse_http_time(value) ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 142 def parse_http_time(value) return if value.blank? Time.httpdate(value) rescue ArgumentError nil end |
#update_source_for_failure(error, duration_ms) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 82 def update_source_for_failure(error, duration_ms) now = Time.current attrs = { last_fetched_at: now, last_fetch_duration_ms: duration_ms, last_http_status: error.http_status, last_error: error., last_error_at: now, failure_count: source.failure_count.to_i + 1, consecutive_fetch_failures: source.consecutive_fetch_failures.to_i + 1 } adaptive_interval.apply_adaptive_interval!(attrs, content_changed: false, failure: true) attrs[:metadata] = decision = apply_retry_strategy!(attrs, error, now) source.update!(attrs) check_consecutive_failure_auto_pause! decision end |
#update_source_for_not_modified(response, duration_ms) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 55 def update_source_for_not_modified(response, duration_ms) attributes = { last_fetched_at: Time.current, last_fetch_duration_ms: duration_ms, last_http_status: response.status, last_error: nil, last_error_at: nil, failure_count: 0, consecutive_fetch_failures: 0 } if (etag = response.headers["etag"] || response.headers["ETag"]) attributes[:etag] = etag end if (last_modified_header = response.headers["last-modified"] || response.headers["Last-Modified"]) parsed_time = parse_http_time(last_modified_header) attributes[:last_modified] = parsed_time if parsed_time end adaptive_interval.apply_adaptive_interval!(attributes, content_changed: false) attributes[:metadata] = reset_retry_state!(attributes) source.update!(attributes) enqueue_favicon_fetch_if_needed end |
#update_source_for_success(response, duration_ms, feed, feed_signature, content_changed: nil, entries_digest: nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 25 def update_source_for_success(response, duration_ms, feed, feed_signature, content_changed: nil, entries_digest: nil) attributes = { last_fetched_at: Time.current, last_fetch_duration_ms: duration_ms, last_http_status: response.status, last_error: nil, last_error_at: nil, failure_count: 0, consecutive_fetch_failures: 0, feed_format: derive_feed_format(feed) } if (etag = response.headers["etag"] || response.headers["ETag"]) attributes[:etag] = etag end if (last_modified_header = response.headers["last-modified"] || response.headers["Last-Modified"]) parsed_time = parse_http_time(last_modified_header) attributes[:last_modified] = parsed_time if parsed_time end # Use explicit content_changed if provided, otherwise fall back to feed signature comparison changed = content_changed.nil? ? feed_signature_changed?(feed_signature) : content_changed adaptive_interval.apply_adaptive_interval!(attributes, content_changed: changed) attributes[:metadata] = (feed_signature: feed_signature, entries_digest: entries_digest) reset_retry_state!(attributes) source.update!(attributes) enqueue_favicon_fetch_if_needed end |
#updated_metadata(feed_signature: nil, entries_digest: nil) ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/source_monitor/fetching/feed_fetcher/source_updater.rb', line 134 def (feed_signature: nil, entries_digest: nil) = (source. || {}).dup .delete("dynamic_fetch_interval_seconds") ["last_feed_signature"] = feed_signature if feed_signature.present? ["last_entries_digest"] = entries_digest if entries_digest.present? end |