Class: TimeBucketStream

Inherits:
Object
  • Object
show all
Defined in:
lib/time_bucket_stream/batch.rb,
lib/time_bucket_stream/claim.rb,
lib/time_bucket_stream/paths.rb,
lib/time_bucket_stream/codecs.rb,
lib/time_bucket_stream/stream.rb,
lib/time_bucket_stream/writer.rb,
lib/time_bucket_stream/claimer.rb,
lib/time_bucket_stream/version.rb,
lib/time_bucket_stream/log_name.rb,
lib/time_bucket_stream/codecs/oj.rb,
lib/time_bucket_stream/quarantine.rb,
lib/time_bucket_stream/codecs/json.rb

Defined Under Namespace

Modules: Codecs, LogName Classes: Batch, Claim, Claimer, Paths, Quarantine, Writer

Constant Summary collapse

DEFAULT_CLAIM_GRACE =
Claimer::DEFAULT_CLAIM_GRACE
DEFAULT_STALE_PARTIAL_AFTER =
Claimer::DEFAULT_STALE_PARTIAL_AFTER
DEFAULT_QUARANTINE_RETENTION =
Claimer::DEFAULT_QUARANTINE_RETENTION
DEFAULT_MALFORMED_ENTRY =
:quarantine
MALFORMED_ENTRY_MODES =
%i[quarantine skip].freeze
VERSION =
"0.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, sync: :flush, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION, malformed_entry: DEFAULT_MALFORMED_ENTRY, codec: Codecs::Json.new) ⇒ TimeBucketStream

Returns a new instance of TimeBucketStream.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/time_bucket_stream/stream.rb', line 12

def initialize(path:, sync: :flush, clock: Time, claim_grace: DEFAULT_CLAIM_GRACE, stale_partial_after: DEFAULT_STALE_PARTIAL_AFTER, quarantine_retention: DEFAULT_QUARANTINE_RETENTION, malformed_entry: DEFAULT_MALFORMED_ENTRY, codec: Codecs::Json.new)
  @paths = Paths.new(path: path)
  @sync = sync.respond_to?(:to_sym) ? sync.to_sym : sync
  @clock = clock
  @codec = Codecs.validate!(codec)
  @claim_grace = Claimer.normalize_claim_grace(claim_grace)
  @stale_partial_after = Claimer.normalize_stale_partial_after(stale_partial_after)
  @quarantine_retention = Claimer.normalize_quarantine_retention(quarantine_retention)
  @malformed_entry = normalize_malformed_entry(malformed_entry)
  @claims_by_log_name = {}

  writer
  claimer
end

Instance Attribute Details

#claim_graceObject (readonly)

Returns the value of attribute claim_grace.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def claim_grace
  @claim_grace
end

#codecObject (readonly)

Returns the value of attribute codec.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def codec
  @codec
end

#malformed_entryObject (readonly)

Returns the value of attribute malformed_entry.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def malformed_entry
  @malformed_entry
end

#pathsObject (readonly)

Returns the value of attribute paths.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def paths
  @paths
end

#quarantine_retentionObject (readonly)

Returns the value of attribute quarantine_retention.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def quarantine_retention
  @quarantine_retention
end

#stale_partial_afterObject (readonly)

Returns the value of attribute stale_partial_after.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def stale_partial_after
  @stale_partial_after
end

#syncObject (readonly)

Returns the value of attribute sync.



10
11
12
# File 'lib/time_bucket_stream/stream.rb', line 10

def sync
  @sync
end

Instance Method Details

#append(payload) ⇒ Object



27
28
29
# File 'lib/time_bucket_stream/stream.rb', line 27

def append(payload)
  writer.append(payload)
end

#closeObject



56
57
58
59
# File 'lib/time_bucket_stream/stream.rb', line 56

def close
  release_claims
  writer.close
end

#drainObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/time_bucket_stream/stream.rb', line 41

def drain
  raise ArgumentError, "drain requires a block" unless block_given?

  batch = read
  completed = false

  batch.each { |payload| yield payload }
  batch.delete
  completed = true

  batch
ensure
  batch.release if batch && !completed && !batch.finished?
end

#readObject



31
32
33
34
35
36
37
38
39
# File 'lib/time_bucket_stream/stream.rb', line 31

def read
  writer.close_stale

  Batch.new(
    entries: active_claim_entries + new_claim_entries,
    on_delete: method(:delete_entries),
    on_release: method(:release_entries)
  )
end