Class: Igniter::Store::SegmentedFileBackend
- Inherits:
-
Object
- Object
- Igniter::Store::SegmentedFileBackend
- Includes:
- WireProtocol
- Defined in:
- lib/igniter/store/segmented_file_backend.rb
Overview
Partitioned, manifest-tracked WAL backend with pluggable per-store codecs.
A single instance replaces FileBackend for a whole IgniterStore — facts from every store are written into per-store, per-time-bucket segment files under a shared root directory.
Layout:
{root_dir}/
wal/
store={name}/
date={bucket}/
segment-000001.wal
segment-000001.wal.manifest.json ← written atomically on seal
segment-000002.wal
Codec selection:
# All stores use the default codec (json_crc32):
SegmentedFileBackend.new(root)
# All stores use compact_delta:
SegmentedFileBackend.new(root, codec: :compact_delta)
# Per-store codec map (string or symbol keys):
SegmentedFileBackend.new(root,
codec: { technician_locations: :compact_delta,
vendor_leads: :compact_delta,
crm_records: :json_crc32 })
compact_delta is recommended for high-frequency History stores (sensor readings, GPS tracks) and gives ~16x size reduction over json_crc32. It is NOT resumable after a crash — any live compact_delta segment is sealed on the next startup and a fresh segment is opened.
Public interface is identical to FileBackend: write_fact, replay, close.
Constant Summary collapse
- MANIFEST_SUFFIX =
".manifest.json"- PURGED_SUFFIX =
".purged.json"- QUARANTINE_SUFFIX =
".quarantine.json"- DEFAULT_MAX_BYTES =
64 MB
64 * 1024 * 1024
- DEFAULT_CODEC =
:json_crc32- SCHEMA_VERSION =
1
Constants included from WireProtocol
WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE
Instance Attribute Summary collapse
-
#root_dir ⇒ Object
readonly
Returns the value of attribute root_dir.
Instance Method Summary collapse
-
#checkpoint! ⇒ Object
Seal every open segment and open a fresh one per store.
- #close ⇒ Object
-
#durability_snapshot ⇒ Object
Returns the current durability posture: configured policy plus a per-store breakdown showing how many facts are buffered in memory vs.
-
#initialize(root_dir, max_bytes: DEFAULT_MAX_BYTES, time_bucket: :day, codec: DEFAULT_CODEC, retention: {}, flush: :batch) ⇒ SegmentedFileBackend
constructor
root_dir— root data directory shared by all stores. -
#purge!(store: nil) ⇒ Object
Delete eligible sealed segments for stores that have a policy.
-
#purge_receipts(store: nil, since: nil, until_: nil, limit: nil) ⇒ Object
List purge receipts written by previous purge! calls.
-
#quarantine_receipts(store: nil) ⇒ Object
List quarantine receipts for segments that could not be decoded.
-
#replay(store: nil, since: nil, as_of: nil) ⇒ Object
Returns all facts from matching segments sorted by timestamp.
- #segment_count ⇒ Object
-
#segment_manifest(store: nil) ⇒ Object
Detailed per-segment manifest for one or all stores.
-
#set_retention(store, strategy:, duration: nil) ⇒ Object
Register (or replace) the retention policy for a store.
-
#storage_stats(store: nil) ⇒ Object
Compact aggregate stats for one or all stores.
- #stored_store_names ⇒ Object
- #write_fact(fact) ⇒ Object
Methods included from WireProtocol
Constructor Details
#initialize(root_dir, max_bytes: DEFAULT_MAX_BYTES, time_bucket: :day, codec: DEFAULT_CODEC, retention: {}, flush: :batch) ⇒ SegmentedFileBackend
root_dir — root data directory shared by all stores. max_bytes — rotate segment when file reaches this size (default 64 MB). time_bucket — :day (default), :hour, or :none. codec — Symbol or Hash=> Symbol. See class docs. retention — Hash{ store_name => { strategy:, duration: } }
Strategies:
:permanent — never purge (default when no policy set)
:rolling_window — purge sealed segments where max_timestamp < now - duration (Float seconds)
:ephemeral — keep only the single newest sealed segment per store
flush — durability policy applied after every write_fact:
:batch — (default) flush only at BATCH_SIZE, close, or checkpoint.
compact_delta facts < BATCH_SIZE are lost on a crash.
:on_write — flush after every single fact (safest, smallest write window).
{ every_n: N } — flush after every N facts per store.
json_crc32 writes every fact immediately regardless of this setting.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 73 def initialize(root_dir, max_bytes: DEFAULT_MAX_BYTES, time_bucket: :day, codec: DEFAULT_CODEC, retention: {}, flush: :batch) @root_dir = root_dir.to_s @max_bytes = max_bytes @time_bucket = time_bucket @codec_spec = codec # Symbol or Hash @flush_policy = flush @segments = {} # store_name (String) → segment state Hash @retention_policies = {} @mutex = Mutex.new FileUtils.mkdir_p(File.join(@root_dir, "wal")) retention.each { |store, policy| set_retention(store, **policy) } recover_orphaned_segments! end |
Instance Attribute Details
#root_dir ⇒ Object (readonly)
Returns the value of attribute root_dir.
55 56 57 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 55 def root_dir @root_dir end |
Instance Method Details
#checkpoint! ⇒ Object
Seal every open segment and open a fresh one per store.
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 113 def checkpoint! @mutex.synchronize do old = @segments.dup @segments.clear old.each do |store, seg| seal_segment!(seg) @segments[store] = open_new_segment(store) end end end |
#close ⇒ Object
124 125 126 127 128 129 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 124 def close @mutex.synchronize do @segments.values.each { |seg| seal_segment!(seg) } @segments.clear end end |
#durability_snapshot ⇒ Object
Returns the current durability posture: configured policy plus a per-store breakdown showing how many facts are buffered in memory vs. on disk.
Buffered facts are at risk of loss on a process crash. A “flushed” store has all accepted facts on disk; a “buffered” store has unflushed in-memory facts that would be lost if the process were killed right now.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 208 def durability_snapshot @mutex.synchronize do stores_snap = @segments.to_h do |name, seg| buffered = seg[:codec].buffered_count [name, { "codec" => seg[:codec_name].to_s, "buffered_count" => buffered, "facts_on_disk" => seg[:count] - buffered, "durability" => buffered > 0 ? "buffered" : "flushed" }] end { "policy" => flush_policy_name, "stores" => stores_snap } end end |
#purge!(store: nil) ⇒ Object
Delete eligible sealed segments for stores that have a policy. Returns an Array of receipt hashes (one per deleted segment). Live (unsealed) segments are never touched. store — restrict purge to one store; nil = all stores with a policy.
152 153 154 155 156 157 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 152 def purge!(store: nil) @mutex.synchronize do targets = store ? [store.to_s] : @retention_policies.keys targets.flat_map { |s| purge_store!(s) } end end |
#purge_receipts(store: nil, since: nil, until_: nil, limit: nil) ⇒ Object
List purge receipts written by previous purge! calls. store — restrict to one store; nil = all stores. since — only receipts where purged_at >= since (Float unix sec). until_ — only receipts where purged_at <= until_ (Float unix sec). limit — return at most this many, ordered by purged_at ascending.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 164 def purge_receipts(store: nil, since: nil, until_: nil, limit: nil) glob = store ? "store=#{store}" : "store=*" receipts = Dir[File.join(@root_dir, "wal", glob, "**", "*#{PURGED_SUFFIX}")] .map { |p| JSON.parse(File.read(p)) rescue nil } .compact .sort_by { |r| r["purged_at"] || 0 } receipts = receipts.select { |r| (r["purged_at"] || 0) >= since } if since receipts = receipts.select { |r| (r["purged_at"] || 0) <= until_ } if until_ receipts = receipts.first(limit) if limit receipts end |
#quarantine_receipts(store: nil) ⇒ Object
List quarantine receipts for segments that could not be decoded. store — restrict to one store; nil = all stores.
178 179 180 181 182 183 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 178 def quarantine_receipts(store: nil) glob = store ? "store=#{store}" : "store=*" Dir[File.join(@root_dir, "wal", glob, "**", "*#{QUARANTINE_SUFFIX}")] .map { |p| JSON.parse(File.read(p)) rescue nil } .compact end |
#replay(store: nil, since: nil, as_of: nil) ⇒ Object
Returns all facts from matching segments sorted by timestamp. store — restrict to one store name (Symbol or String); nil = all stores. since — skip sealed segments with max_timestamp < since (Float unix sec). as_of — skip sealed segments with min_timestamp > as_of (Float unix sec).
106 107 108 109 110 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 106 def replay(store: nil, since: nil, as_of: nil) segment_paths_for(store: store ? store.to_s : nil, since: since, as_of: as_of) .flat_map { |path| read_segment(path) } .sort_by(&:transaction_time) end |
#segment_count ⇒ Object
131 132 133 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 131 def segment_count all_segment_paths.size end |
#segment_manifest(store: nil) ⇒ Object
Detailed per-segment manifest for one or all stores. Includes a “segments” array with one entry per segment (sealed + live). Safe to call while the backend is open.
188 189 190 191 192 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 188 def segment_manifest(store: nil) @mutex.synchronize do build_storage_view(store: store ? store.to_s : nil, include_segments: true) end end |
#set_retention(store, strategy:, duration: nil) ⇒ Object
Register (or replace) the retention policy for a store.
142 143 144 145 146 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 142 def set_retention(store, strategy:, duration: nil) @mutex.synchronize do @retention_policies[store.to_s] = { strategy: strategy.to_sym, duration: duration } end end |
#storage_stats(store: nil) ⇒ Object
Compact aggregate stats for one or all stores. No per-segment detail — suitable for health checks and protocol metadata.
196 197 198 199 200 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 196 def storage_stats(store: nil) @mutex.synchronize do build_storage_view(store: store ? store.to_s : nil, include_segments: false) end end |
#stored_store_names ⇒ Object
135 136 137 138 139 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 135 def stored_store_names Dir[File.join(@root_dir, "wal", "store=*")] .select { |d| File.directory?(d) } .map { |d| File.basename(d).sub("store=", "") } end |
#write_fact(fact) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/igniter/store/segmented_file_backend.rb', line 89 def write_fact(fact) store = fact.store.to_s @mutex.synchronize do seg = active_segment_for(store) seg[:codec].encode_fact(seg[:file], fact) seg[:count] += 1 ts = fact.transaction_time.to_f seg[:min_ts] = seg[:min_ts] ? [seg[:min_ts], ts].min : ts seg[:max_ts] = seg[:max_ts] ? [seg[:max_ts], ts].max : ts apply_flush_policy(seg) end end |