Class: EventMeter::Stores::Rollup::File
Defined Under Namespace
Classes: IndexStruct, ProcessedSidecar
Constant Summary
collapse
- APPLIED_KEY =
"_applied"
- BATCHES_KEY =
"batches"
- PROCESSED_IDS_KEY =
"processed_ids"
- STREAM_FILE_KEY =
"stream_file"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(path:, namespace: nil, report_name: nil, version: nil) ⇒ File
Returns a new instance of File.
26
27
28
29
30
31
32
33
|
# File 'lib/event_meter/stores/rollup/file.rb', line 26
def initialize(path:, namespace: nil, report_name: nil, version: nil)
@path = normalize_file_store_path(path)
@namespace = normalize_namespace(namespace) if namespace
@report_name = report_name
@version = version
FileUtils.mkdir_p(rollup_path) if scoped?
end
|
Instance Attribute Details
#namespace ⇒ Object
Returns the value of attribute namespace.
24
25
26
|
# File 'lib/event_meter/stores/rollup/file.rb', line 24
def namespace
@namespace
end
|
#path ⇒ Object
Returns the value of attribute path.
24
25
26
|
# File 'lib/event_meter/stores/rollup/file.rb', line 24
def path
@path
end
|
#report_name ⇒ Object
Returns the value of attribute report_name.
24
25
26
|
# File 'lib/event_meter/stores/rollup/file.rb', line 24
def report_name
@report_name
end
|
#version ⇒ Object
Returns the value of attribute version.
24
25
26
|
# File 'lib/event_meter/stores/rollup/file.rb', line 24
def version
@version
end
|
Instance Method Details
#apply(batch) ⇒ Object
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/event_meter/stores/rollup/file.rb', line 81
def apply(batch)
ensure_scoped!
return if batch.empty?
batch_id = transaction_id(batch.entry_ids)
applied_paths = []
applied_paths.concat(apply_rollups(batch_id, batch))
applied_paths.concat(apply_string_updates(batch_id, batch))
mark_processed_entries(batch, batch_id, applied_paths.uniq)
end
|
#cleanup_history(before:, events:, interval_state:) ⇒ Object
160
161
162
163
164
165
|
# File 'lib/event_meter/stores/rollup/file.rb', line 160
def cleanup_history(before:, events:, interval_state:)
ensure_namespace!
return cleanup_all_report_histories(before: before, events: events, interval_state: interval_state) unless scoped?
cleanup_scoped_history(before: before, events: events, interval_state: interval_state)
end
|
#cleanup_watermark(key) ⇒ Object
137
138
139
|
# File 'lib/event_meter/stores/rollup/file.rb', line 137
def cleanup_watermark(key)
read_json_file(cleanup_state_path)[key]
end
|
#ensure_definition(definition) ⇒ Object
57
58
59
60
61
62
63
64
65
|
# File 'lib/event_meter/stores/rollup/file.rb', line 57
def ensure_definition(definition)
update_json_file(definition_path, {}) do |stored|
if stored.empty?
stored.merge!(definition.to_h)
else
ensure_same_definition!(stored, definition)
end
end
end
|
#for_namespace(namespace) ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/event_meter/stores/rollup/file.rb', line 35
def for_namespace(namespace)
namespace = normalize_namespace(namespace)
return self if self.namespace == namespace
if self.namespace
raise ConfigurationError, "file rollup storage namespace #{self.namespace.inspect} does not match #{namespace.inspect}"
end
@namespace = namespace
FileUtils.mkdir_p(rollup_path) if scoped?
self
end
|
#for_report(name:, version:) ⇒ Object
48
49
50
51
52
53
54
55
|
# File 'lib/event_meter/stores/rollup/file.rb', line 48
def for_report(name:, version:)
self.class.new(
path: path,
namespace: namespace,
report_name: name.to_s,
version: version
)
end
|
#forget_processed_ids(ids) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/event_meter/stores/rollup/file.rb', line 92
def forget_processed_ids(ids)
ensure_scoped!
sidecars = ids.map { |id| processed_sidecar_for(id) }.uniq(&:path)
applied_paths_by_batch = {}
sidecars.each do |sidecar|
sidecar.batch_paths.each do |batch_id, paths|
applied_paths_by_batch[batch_id] ||= []
applied_paths_by_batch[batch_id].concat(paths)
end
sidecar.delete
end
applied_paths_by_batch.each do |batch_id, relative_paths|
forget_applied_marker(batch_id, relative_paths.uniq)
end
end
|
#get(key) ⇒ Object
131
132
133
134
135
|
# File 'lib/event_meter/stores/rollup/file.rb', line 131
def get(key)
ensure_scoped!
read_json_file(shard_path("strings", key))[key]
end
|
#hgetall_many(keys) ⇒ Object
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/event_meter/stores/rollup/file.rb', line 112
def hgetall_many(keys)
ensure_scoped!
keys.map do |key|
rollup = rollup_key_parts(key)
next {} unless rollup
data = read_json_file(rollup_bucket_path(rollup.fetch(:every), rollup.fetch(:bucket)))
hash_value(data[rollup.fetch(:index)]).dup
end
end
|
#keys_matching(pattern, limit: nil) ⇒ Object
124
125
126
127
128
129
|
# File 'lib/event_meter/stores/rollup/file.rb', line 124
def keys_matching(pattern, limit: nil)
ensure_scoped!
keys = rollup_keys.select { |key| key_matches?(key, pattern) }.sort
limit ? keys.first(positive_integer(limit, "limit")) : keys
end
|
#processed_ids(ids) ⇒ Object
73
74
75
76
77
78
79
|
# File 'lib/event_meter/stores/rollup/file.rb', line 73
def processed_ids(ids)
ensure_scoped!
ids.select do |id|
processed_sidecar_for(id).processed?(id)
end
end
|
#report_definition(name:, version:) ⇒ Object
67
68
69
70
71
|
# File 'lib/event_meter/stores/rollup/file.rb', line 67
def report_definition(name:, version:)
return nil unless scoped_for?(name, version)
hash_value(read_json_file(definition_path))
end
|
#with_lock(ttl:) ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/event_meter/stores/rollup/file.rb', line 147
def with_lock(ttl:)
FileUtils.mkdir_p(::File.dirname(lock_path))
::File.open(lock_path, ::File::RDWR | ::File::CREAT, 0o600) do |file|
return false unless file.flock(::File::LOCK_EX | ::File::LOCK_NB)
yield
true
ensure
file&.flock(::File::LOCK_UN)
end
end
|
#write_cleanup_watermark(key, value) ⇒ Object
141
142
143
144
145
|
# File 'lib/event_meter/stores/rollup/file.rb', line 141
def write_cleanup_watermark(key, value)
update_json_file(cleanup_state_path, {}) do |data|
data[key] = value.to_s
end
end
|