Class: TimeBucketStream::Claimer
- Inherits:
-
Object
- Object
- TimeBucketStream::Claimer
- Defined in:
- lib/time_bucket_stream/claimer.rb
Constant Summary collapse
- DEFAULT_CLAIM_GRACE =
10- DEFAULT_STALE_PARTIAL_AFTER =
600- DEFAULT_QUARANTINE_RETENTION =
Quarantine::DEFAULT_RETENTION
Instance Attribute Summary collapse
-
#claim_grace ⇒ Object
readonly
Returns the value of attribute claim_grace.
-
#paths ⇒ Object
readonly
Returns the value of attribute paths.
-
#quarantine_retention ⇒ Object
readonly
Returns the value of attribute quarantine_retention.
-
#stale_partial_after ⇒ Object
readonly
Returns the value of attribute stale_partial_after.
Class Method Summary collapse
- .normalize_claim_grace(value) ⇒ Object
- .normalize_quarantine_retention(value) ⇒ Object
- .normalize_stale_partial_after(value) ⇒ Object
Instance Method Summary collapse
- #claim_completed(before: claim_before_bucket) ⇒ Object
- #delete_claimed(log_name) ⇒ Object
-
#initialize(path:, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION) ⇒ Claimer
constructor
A new instance of Claimer.
- #release_claimed(log_name) ⇒ Object
Constructor Details
#initialize(path:, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION) ⇒ Claimer
Returns a new instance of Claimer.
35 36 37 38 39 40 41 42 43 |
# File 'lib/time_bucket_stream/claimer.rb', line 35 def initialize(path:, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION) @paths = Paths.new(path: path) @clock = clock @claim_grace = self.class.normalize_claim_grace(claim_grace) @stale_partial_after = self.class.normalize_stale_partial_after(stale_partial_after) @quarantine_retention = self.class.normalize_quarantine_retention(quarantine_retention) ensure_directories end |
Instance Attribute Details
#claim_grace ⇒ Object (readonly)
Returns the value of attribute claim_grace.
11 12 13 |
# File 'lib/time_bucket_stream/claimer.rb', line 11 def claim_grace @claim_grace end |
#paths ⇒ Object (readonly)
Returns the value of attribute paths.
11 12 13 |
# File 'lib/time_bucket_stream/claimer.rb', line 11 def paths @paths end |
#quarantine_retention ⇒ Object (readonly)
Returns the value of attribute quarantine_retention.
11 12 13 |
# File 'lib/time_bucket_stream/claimer.rb', line 11 def quarantine_retention @quarantine_retention end |
#stale_partial_after ⇒ Object (readonly)
Returns the value of attribute stale_partial_after.
11 12 13 |
# File 'lib/time_bucket_stream/claimer.rb', line 11 def stale_partial_after @stale_partial_after end |
Class Method Details
.normalize_claim_grace(value) ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/time_bucket_stream/claimer.rb', line 13 def self.normalize_claim_grace(value) seconds = Integer(value) return seconds if seconds >= 0 raise ArgumentError rescue ArgumentError, TypeError, RangeError raise ArgumentError, "claim_grace must be a non-negative number of seconds" end |
.normalize_quarantine_retention(value) ⇒ Object
31 32 33 |
# File 'lib/time_bucket_stream/claimer.rb', line 31 def self.normalize_quarantine_retention(value) Quarantine.normalize_retention(value) end |
.normalize_stale_partial_after(value) ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/time_bucket_stream/claimer.rb', line 22 def self.normalize_stale_partial_after(value) seconds = Integer(value) return seconds if seconds >= 0 raise ArgumentError rescue ArgumentError, TypeError, RangeError raise ArgumentError, "stale_partial_after must be a non-negative number of seconds" end |
Instance Method Details
#claim_completed(before: claim_before_bucket) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/time_bucket_stream/claimer.rb', line 45 def claim_completed(before: claim_before_bucket) before = before.to_s ensure_directories cleanup_quarantine quarantine_stale_partials claimable_log_names(before).filter_map do |log_name| claim_log(log_name, before) end end |
#delete_claimed(log_name) ⇒ Object
57 58 59 |
# File 'lib/time_bucket_stream/claimer.rb', line 57 def delete_claimed(log_name) claim_processing(log_name)&.delete end |
#release_claimed(log_name) ⇒ Object
61 62 63 |
# File 'lib/time_bucket_stream/claimer.rb', line 61 def release_claimed(log_name) claim_processing(log_name)&.release end |