Class: Igniter::Store::SegmentedFileBackend

Inherits:
Object
  • Object
show all
Includes:
WireProtocol
Defined in:
lib/igniter/store/segmented_file_backend.rb

Overview

Partitioned, manifest-tracked WAL backend with pluggable per-store codecs.

A single instance replaces FileBackend for a whole IgniterStore — facts from every store are written into per-store, per-time-bucket segment files under a shared root directory.

Layout:

{root_dir}/
  wal/
    store={name}/
      date={bucket}/
        segment-000001.wal
        segment-000001.wal.manifest.json   ← written atomically on seal
        segment-000002.wal

Codec selection:

# All stores use the default codec (json_crc32):
SegmentedFileBackend.new(root)

# All stores use compact_delta:
SegmentedFileBackend.new(root, codec: :compact_delta)

# Per-store codec map (string or symbol keys):
SegmentedFileBackend.new(root,
  codec: { technician_locations: :compact_delta,
           vendor_leads:         :compact_delta,
           crm_records:          :json_crc32 })

compact_delta is recommended for high-frequency History stores (sensor readings, GPS tracks) and gives ~16x size reduction over json_crc32. It is NOT resumable after a crash — any live compact_delta segment is sealed on the next startup and a fresh segment is opened.

Public interface is identical to FileBackend: write_fact, replay, close.

Constant Summary collapse

MANIFEST_SUFFIX =
".manifest.json"
PURGED_SUFFIX =
".purged.json"
QUARANTINE_SUFFIX =
".quarantine.json"
DEFAULT_MAX_BYTES =

64 MB

64 * 1024 * 1024
DEFAULT_CODEC =
:json_crc32
SCHEMA_VERSION =
1

Constants included from WireProtocol

WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from WireProtocol

#encode_frame, #read_frame

Constructor Details

#initialize(root_dir, max_bytes: DEFAULT_MAX_BYTES, time_bucket: :day, codec: DEFAULT_CODEC, retention: {}, flush: :batch) ⇒ SegmentedFileBackend

root_dir — root data directory shared by all stores. max_bytes — rotate segment when file reaches this size (default 64 MB). time_bucket — :day (default), :hour, or :none. codec — Symbol or Hash=> Symbol. See class docs. retention — Hash{ store_name => { strategy:, duration: } }

Strategies:
  :permanent      — never purge (default when no policy set)
  :rolling_window — purge sealed segments where max_timestamp < now - duration (Float seconds)
  :ephemeral      — keep only the single newest sealed segment per store

flush — durability policy applied after every write_fact:

:batch      — (default) flush only at BATCH_SIZE, close, or checkpoint.
              compact_delta facts < BATCH_SIZE are lost on a crash.
:on_write   — flush after every single fact (safest, smallest write window).
{ every_n: N } — flush after every N facts per store.

json_crc32 writes every fact immediately regardless of this setting.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/igniter/store/segmented_file_backend.rb', line 73

def initialize(root_dir, max_bytes: DEFAULT_MAX_BYTES, time_bucket: :day,
               codec: DEFAULT_CODEC, retention: {}, flush: :batch)
  @root_dir           = root_dir.to_s
  @max_bytes          = max_bytes
  @time_bucket        = time_bucket
  @codec_spec         = codec      # Symbol or Hash
  @flush_policy       = flush
  @segments           = {}         # store_name (String) → segment state Hash
  @retention_policies = {}
  @mutex              = Mutex.new

  FileUtils.mkdir_p(File.join(@root_dir, "wal"))
  retention.each { |store, policy| set_retention(store, **policy) }
  recover_orphaned_segments!
end

Instance Attribute Details

#root_dirObject (readonly)

Returns the value of attribute root_dir.



55
56
57
# File 'lib/igniter/store/segmented_file_backend.rb', line 55

def root_dir
  @root_dir
end

Instance Method Details

#checkpoint!Object

Seal every open segment and open a fresh one per store.



113
114
115
116
117
118
119
120
121
122
# File 'lib/igniter/store/segmented_file_backend.rb', line 113

def checkpoint!
  @mutex.synchronize do
    old = @segments.dup
    @segments.clear
    old.each do |store, seg|
      seal_segment!(seg)
      @segments[store] = open_new_segment(store)
    end
  end
end

#closeObject



124
125
126
127
128
129
# File 'lib/igniter/store/segmented_file_backend.rb', line 124

def close
  @mutex.synchronize do
    @segments.values.each { |seg| seal_segment!(seg) }
    @segments.clear
  end
end

#durability_snapshotObject

Returns the current durability posture: configured policy plus a per-store breakdown showing how many facts are buffered in memory vs. on disk.

Buffered facts are at risk of loss on a process crash. A “flushed” store has all accepted facts on disk; a “buffered” store has unflushed in-memory facts that would be lost if the process were killed right now.



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/igniter/store/segmented_file_backend.rb', line 208

def durability_snapshot
  @mutex.synchronize do
    stores_snap = @segments.to_h do |name, seg|
      buffered = seg[:codec].buffered_count
      [name, {
        "codec"          => seg[:codec_name].to_s,
        "buffered_count" => buffered,
        "facts_on_disk"  => seg[:count] - buffered,
        "durability"     => buffered > 0 ? "buffered" : "flushed"
      }]
    end
    { "policy" => flush_policy_name, "stores" => stores_snap }
  end
end

#purge!(store: nil) ⇒ Object

Delete eligible sealed segments for stores that have a policy. Returns an Array of receipt hashes (one per deleted segment). Live (unsealed) segments are never touched. store — restrict purge to one store; nil = all stores with a policy.



152
153
154
155
156
157
# File 'lib/igniter/store/segmented_file_backend.rb', line 152

def purge!(store: nil)
  @mutex.synchronize do
    targets = store ? [store.to_s] : @retention_policies.keys
    targets.flat_map { |s| purge_store!(s) }
  end
end

#purge_receipts(store: nil, since: nil, until_: nil, limit: nil) ⇒ Object

List purge receipts written by previous purge! calls. store — restrict to one store; nil = all stores. since — only receipts where purged_at >= since (Float unix sec). until_ — only receipts where purged_at <= until_ (Float unix sec). limit — return at most this many, ordered by purged_at ascending.



164
165
166
167
168
169
170
171
172
173
174
# File 'lib/igniter/store/segmented_file_backend.rb', line 164

def purge_receipts(store: nil, since: nil, until_: nil, limit: nil)
  glob = store ? "store=#{store}" : "store=*"
  receipts = Dir[File.join(@root_dir, "wal", glob, "**", "*#{PURGED_SUFFIX}")]
               .map { |p| JSON.parse(File.read(p)) rescue nil }
               .compact
               .sort_by { |r| r["purged_at"] || 0 }
  receipts = receipts.select { |r| (r["purged_at"] || 0) >= since    } if since
  receipts = receipts.select { |r| (r["purged_at"] || 0) <= until_   } if until_
  receipts = receipts.first(limit)                                       if limit
  receipts
end

#quarantine_receipts(store: nil) ⇒ Object

List quarantine receipts for segments that could not be decoded. store — restrict to one store; nil = all stores.



178
179
180
181
182
183
# File 'lib/igniter/store/segmented_file_backend.rb', line 178

def quarantine_receipts(store: nil)
  glob = store ? "store=#{store}" : "store=*"
  Dir[File.join(@root_dir, "wal", glob, "**", "*#{QUARANTINE_SUFFIX}")]
    .map { |p| JSON.parse(File.read(p)) rescue nil }
    .compact
end

#replay(store: nil, since: nil, as_of: nil) ⇒ Object

Returns all facts from matching segments sorted by timestamp. store — restrict to one store name (Symbol or String); nil = all stores. since — skip sealed segments with max_timestamp < since (Float unix sec). as_of — skip sealed segments with min_timestamp > as_of (Float unix sec).



106
107
108
109
110
# File 'lib/igniter/store/segmented_file_backend.rb', line 106

def replay(store: nil, since: nil, as_of: nil)
  segment_paths_for(store: store ? store.to_s : nil, since: since, as_of: as_of)
    .flat_map { |path| read_segment(path) }
    .sort_by(&:transaction_time)
end

#segment_countObject



131
132
133
# File 'lib/igniter/store/segmented_file_backend.rb', line 131

def segment_count
  all_segment_paths.size
end

#segment_manifest(store: nil) ⇒ Object

Detailed per-segment manifest for one or all stores. Includes a “segments” array with one entry per segment (sealed + live). Safe to call while the backend is open.



188
189
190
191
192
# File 'lib/igniter/store/segmented_file_backend.rb', line 188

def segment_manifest(store: nil)
  @mutex.synchronize do
    build_storage_view(store: store ? store.to_s : nil, include_segments: true)
  end
end

#set_retention(store, strategy:, duration: nil) ⇒ Object

Register (or replace) the retention policy for a store.



142
143
144
145
146
# File 'lib/igniter/store/segmented_file_backend.rb', line 142

def set_retention(store, strategy:, duration: nil)
  @mutex.synchronize do
    @retention_policies[store.to_s] = { strategy: strategy.to_sym, duration: duration }
  end
end

#storage_stats(store: nil) ⇒ Object

Compact aggregate stats for one or all stores. No per-segment detail — suitable for health checks and protocol metadata.



196
197
198
199
200
# File 'lib/igniter/store/segmented_file_backend.rb', line 196

def storage_stats(store: nil)
  @mutex.synchronize do
    build_storage_view(store: store ? store.to_s : nil, include_segments: false)
  end
end

#stored_store_namesObject



135
136
137
138
139
# File 'lib/igniter/store/segmented_file_backend.rb', line 135

def stored_store_names
  Dir[File.join(@root_dir, "wal", "store=*")]
    .select { |d| File.directory?(d) }
    .map    { |d| File.basename(d).sub("store=", "") }
end

#write_fact(fact) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/igniter/store/segmented_file_backend.rb', line 89

def write_fact(fact)
  store = fact.store.to_s
  @mutex.synchronize do
    seg = active_segment_for(store)
    seg[:codec].encode_fact(seg[:file], fact)
    seg[:count] += 1
    ts = fact.transaction_time.to_f
    seg[:min_ts] = seg[:min_ts] ? [seg[:min_ts], ts].min : ts
    seg[:max_ts] = seg[:max_ts] ? [seg[:max_ts], ts].max : ts
    apply_flush_policy(seg)
  end
end