Class: TimeBucketStream::Claimer

Inherits:
Object
  • Object
show all
Defined in:
lib/time_bucket_stream/claimer.rb

Constant Summary collapse

DEFAULT_CLAIM_GRACE =
10
DEFAULT_STALE_PARTIAL_AFTER =
600
DEFAULT_QUARANTINE_RETENTION =
Quarantine::DEFAULT_RETENTION

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION) ⇒ Claimer

Returns a new instance of Claimer.



35
36
37
38
39
40
41
42
43
# File 'lib/time_bucket_stream/claimer.rb', line 35

def initialize(path:, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION)
  @paths = Paths.new(path: path)
  @clock = clock
  @claim_grace = self.class.normalize_claim_grace(claim_grace)
  @stale_partial_after = self.class.normalize_stale_partial_after(stale_partial_after)
  @quarantine_retention = self.class.normalize_quarantine_retention(quarantine_retention)

  ensure_directories
end

Instance Attribute Details

#claim_graceObject (readonly)

Returns the value of attribute claim_grace.



11
12
13
# File 'lib/time_bucket_stream/claimer.rb', line 11

def claim_grace
  @claim_grace
end

#pathsObject (readonly)

Returns the value of attribute paths.



11
12
13
# File 'lib/time_bucket_stream/claimer.rb', line 11

def paths
  @paths
end

#quarantine_retentionObject (readonly)

Returns the value of attribute quarantine_retention.



11
12
13
# File 'lib/time_bucket_stream/claimer.rb', line 11

def quarantine_retention
  @quarantine_retention
end

#stale_partial_afterObject (readonly)

Returns the value of attribute stale_partial_after.



11
12
13
# File 'lib/time_bucket_stream/claimer.rb', line 11

def stale_partial_after
  @stale_partial_after
end

Class Method Details

.normalize_claim_grace(value) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/time_bucket_stream/claimer.rb', line 13

def self.normalize_claim_grace(value)
  seconds = Integer(value)
  return seconds if seconds >= 0

  raise ArgumentError
rescue ArgumentError, TypeError, RangeError
  raise ArgumentError, "claim_grace must be a non-negative number of seconds"
end

.normalize_quarantine_retention(value) ⇒ Object



31
32
33
# File 'lib/time_bucket_stream/claimer.rb', line 31

def self.normalize_quarantine_retention(value)
  Quarantine.normalize_retention(value)
end

.normalize_stale_partial_after(value) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/time_bucket_stream/claimer.rb', line 22

def self.normalize_stale_partial_after(value)
  seconds = Integer(value)
  return seconds if seconds >= 0

  raise ArgumentError
rescue ArgumentError, TypeError, RangeError
  raise ArgumentError, "stale_partial_after must be a non-negative number of seconds"
end

Instance Method Details

#claim_completed(before: claim_before_bucket) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/time_bucket_stream/claimer.rb', line 45

def claim_completed(before: claim_before_bucket)
  before = before.to_s

  ensure_directories
  cleanup_quarantine
  quarantine_stale_partials

  claimable_log_names(before).filter_map do |log_name|
    claim_log(log_name, before)
  end
end

#delete_claimed(log_name) ⇒ Object



57
58
59
# File 'lib/time_bucket_stream/claimer.rb', line 57

def delete_claimed(log_name)
  claim_processing(log_name)&.delete
end

#release_claimed(log_name) ⇒ Object



61
62
63
# File 'lib/time_bucket_stream/claimer.rb', line 61

def release_claimed(log_name)
  claim_processing(log_name)&.release
end