Class: EventMeter::Stores::Rollup::File

Inherits:
Object
  • Object
show all
Includes:
CleanupHelpers, FileHelpers, Namespace
Defined in:
lib/event_meter/stores/rollup/file.rb

Defined Under Namespace

Classes: IndexStruct, ProcessedSidecar

Constant Summary collapse

APPLIED_KEY =
"_applied"
BATCHES_KEY =
"batches"
PROCESSED_IDS_KEY =
"processed_ids"
STREAM_FILE_KEY =
"stream_file"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, namespace: nil, report_name: nil, version: nil) ⇒ File

Returns a new instance of File.



26
27
28
29
30
31
32
33
# File 'lib/event_meter/stores/rollup/file.rb', line 26

def initialize(path:, namespace: nil, report_name: nil, version: nil)
  @path = normalize_file_store_path(path)
  @namespace = normalize_namespace(namespace) if namespace
  @report_name = report_name
  @version = version

  FileUtils.mkdir_p(rollup_path) if scoped?
end

Instance Attribute Details

#namespaceObject (readonly)

Returns the value of attribute namespace.



24
25
26
# File 'lib/event_meter/stores/rollup/file.rb', line 24

def namespace
  @namespace
end

#pathObject (readonly)

Returns the value of attribute path.



24
25
26
# File 'lib/event_meter/stores/rollup/file.rb', line 24

def path
  @path
end

#report_nameObject (readonly)

Returns the value of attribute report_name.



24
25
26
# File 'lib/event_meter/stores/rollup/file.rb', line 24

def report_name
  @report_name
end

#versionObject (readonly)

Returns the value of attribute version.



24
25
26
# File 'lib/event_meter/stores/rollup/file.rb', line 24

def version
  @version
end

Instance Method Details

#apply(batch) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/event_meter/stores/rollup/file.rb', line 81

def apply(batch)
  ensure_scoped!
  return if batch.empty?

  batch_id = transaction_id(batch.entry_ids)
  applied_paths = []
  applied_paths.concat(apply_rollups(batch_id, batch))
  applied_paths.concat(apply_string_updates(batch_id, batch))
  mark_processed_entries(batch, batch_id, applied_paths.uniq)
end

#cleanup_history(before:, events:, interval_state:) ⇒ Object



160
161
162
163
164
165
# File 'lib/event_meter/stores/rollup/file.rb', line 160

def cleanup_history(before:, events:, interval_state:)
  ensure_namespace!
  return cleanup_all_report_histories(before: before, events: events, interval_state: interval_state) unless scoped?

  cleanup_scoped_history(before: before, events: events, interval_state: interval_state)
end

#cleanup_watermark(key) ⇒ Object



137
138
139
# File 'lib/event_meter/stores/rollup/file.rb', line 137

def cleanup_watermark(key)
  read_json_file(cleanup_state_path)[key]
end

#ensure_definition(definition) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/event_meter/stores/rollup/file.rb', line 57

def ensure_definition(definition)
  update_json_file(definition_path, {}) do |stored|
    if stored.empty?
      stored.merge!(definition.to_h)
    else
      ensure_same_definition!(stored, definition)
    end
  end
end

#for_namespace(namespace) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/event_meter/stores/rollup/file.rb', line 35

def for_namespace(namespace)
  namespace = normalize_namespace(namespace)
  return self if self.namespace == namespace

  if self.namespace
    raise ConfigurationError, "file rollup storage namespace #{self.namespace.inspect} does not match #{namespace.inspect}"
  end

  @namespace = namespace
  FileUtils.mkdir_p(rollup_path) if scoped?
  self
end

#for_report(name:, version:) ⇒ Object



48
49
50
51
52
53
54
55
# File 'lib/event_meter/stores/rollup/file.rb', line 48

def for_report(name:, version:)
  self.class.new(
    path: path,
    namespace: namespace,
    report_name: name.to_s,
    version: version
  )
end

#forget_processed_ids(ids) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/event_meter/stores/rollup/file.rb', line 92

def forget_processed_ids(ids)
  ensure_scoped!

  sidecars = ids.map { |id| processed_sidecar_for(id) }.uniq(&:path)
  applied_paths_by_batch = {}

  sidecars.each do |sidecar|
    sidecar.batch_paths.each do |batch_id, paths|
      applied_paths_by_batch[batch_id] ||= []
      applied_paths_by_batch[batch_id].concat(paths)
    end

    sidecar.delete
  end

  applied_paths_by_batch.each do |batch_id, relative_paths|
    forget_applied_marker(batch_id, relative_paths.uniq)
  end
end

#get(key) ⇒ Object



131
132
133
134
135
# File 'lib/event_meter/stores/rollup/file.rb', line 131

def get(key)
  ensure_scoped!

  read_json_file(shard_path("strings", key))[key]
end

#hgetall_many(keys) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/event_meter/stores/rollup/file.rb', line 112

def hgetall_many(keys)
  ensure_scoped!

  keys.map do |key|
    rollup = rollup_key_parts(key)
    next {} unless rollup

    data = read_json_file(rollup_bucket_path(rollup.fetch(:every), rollup.fetch(:bucket)))
    hash_value(data[rollup.fetch(:index)]).dup
  end
end

#keys_matching(pattern, limit: nil) ⇒ Object



124
125
126
127
128
129
# File 'lib/event_meter/stores/rollup/file.rb', line 124

def keys_matching(pattern, limit: nil)
  ensure_scoped!

  keys = rollup_keys.select { |key| key_matches?(key, pattern) }.sort
  limit ? keys.first(positive_integer(limit, "limit")) : keys
end

#processed_ids(ids) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/event_meter/stores/rollup/file.rb', line 73

def processed_ids(ids)
  ensure_scoped!

  ids.select do |id|
    processed_sidecar_for(id).processed?(id)
  end
end

#report_definition(name:, version:) ⇒ Object



67
68
69
70
71
# File 'lib/event_meter/stores/rollup/file.rb', line 67

def report_definition(name:, version:)
  return nil unless scoped_for?(name, version)

  hash_value(read_json_file(definition_path))
end

#with_lock(ttl:) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/event_meter/stores/rollup/file.rb', line 147

def with_lock(ttl:)
  FileUtils.mkdir_p(::File.dirname(lock_path))

  ::File.open(lock_path, ::File::RDWR | ::File::CREAT, 0o600) do |file|
    return false unless file.flock(::File::LOCK_EX | ::File::LOCK_NB)

    yield
    true
  ensure
    file&.flock(::File::LOCK_UN)
  end
end

#write_cleanup_watermark(key, value) ⇒ Object



141
142
143
144
145
# File 'lib/event_meter/stores/rollup/file.rb', line 141

def write_cleanup_watermark(key, value)
  update_json_file(cleanup_state_path, {}) do |data|
    data[key] = value.to_s
  end
end