Class: Igniter::Store::Codecs::CompactDelta
- Inherits:
-
Object
- Object
- Igniter::Store::Codecs::CompactDelta
- Includes:
- WireProtocol
- Defined in:
- lib/igniter/store/codecs.rb
Overview
── CompactDelta ────────────────────────────────────────────────────────
Structural compression optimised for high-frequency History stores (sensor readings, GPS tracks, telemetry).
What is removed vs the full Fact representation:
id → not stored; synthetic id assigned on decode
store → in segment header (once per segment)
value_hash → not stored; recomputed from value on decode
causation → always nil for History — omitted
term → in segment header
schema_version→ in segment header
value keys → field index from per-segment dictionary (header frame)
key string → per-segment key dictionary index (delta updates per batch)
timestamp → absolute ms for first entry; signed delta-ms thereafter
Segment layout (all frames use WireProtocol CRC32 framing):
[header_frame] MessagePack { store, fields:[...], term, schema_version }
[batch_frame] MessagePack { km:{idx=>key,...}, e:[[ki,Δms,[v0,v1…]],…] }
compressed with Zlib before framing
...
The key map (km) in each batch carries only NEW keys added since the previous batch, so readers accumulate it incrementally.
Benchmark result (GPS stream, 5 k facts):
json_crc32 → 380 bytes/fact
compact_delta→ 23 bytes/fact (16x smaller)
Limitation (native mode): decoded Facts receive synthetic ids and, in the Rust native extension, timestamps are reset to Time.now — the same known gap as json_crc32 in native mode. Pure-Ruby mode restores timestamps correctly.
Constant Summary collapse
- NAME =
"compact_delta_zlib"- BATCH_SIZE =
64
Constants included from WireProtocol
WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE
Instance Method Summary collapse
- #buffered_count ⇒ Object
-
#decode(io) ⇒ Object
── Read side ────────────────────────────────────────────────────────.
-
#encode_fact(io, fact) ⇒ Object
Returns bytes written to io (0 while batch is buffered).
-
#flush(io) ⇒ Object
Flush any remaining buffered facts.
-
#initialize ⇒ CompactDelta
constructor
── Write side ──────────────────────────────────────────────────────.
- #name ⇒ Object
- #start_segment(_io, store: nil) ⇒ Object
Methods included from WireProtocol
Constructor Details
#initialize ⇒ CompactDelta
── Write side ──────────────────────────────────────────────────────
120 121 122 123 124 125 126 127 128 |
# File 'lib/igniter/store/codecs.rb', line 120 def initialize @fields = nil @key_map = {} # key_string → Integer index @km_flushed = 0 # keys already sent to disk @last_ts_ms = nil @batch_buf = [] @header_written = false @store = nil end |
Instance Method Details
#buffered_count ⇒ Object
155 |
# File 'lib/igniter/store/codecs.rb', line 155 def buffered_count = @batch_buf.size |
#decode(io) ⇒ Object
── Read side ────────────────────────────────────────────────────────
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/igniter/store/codecs.rb', line 159 def decode(io) header_body = read_frame(io) return [] unless header_body header = MessagePack.unpack(header_body) fields = header["fields"] || [] store = (header["store"] || "").to_sym valid_time = (header["valid_time"] || header["term"])&.to_f sv = (header["schema_version"] || 1).to_i key_map = {} # Integer index → key_string last_ts_ms = nil facts = [] while (body = read_frame(io)) _count = body[0, 4].unpack1("N") raw = Zlib::Inflate.inflate(body[4..]) batch = MessagePack.unpack(raw) (batch["km"] || {}).each { |idx, key| key_map[idx.to_i] = key } (batch["e"] || []).each do |key_idx, delta_ms, vals| ts_ms = last_ts_ms ? last_ts_ms + delta_ms : delta_ms last_ts_ms = ts_ms value = fields.each_with_index.to_h { |fn, i| [fn.to_sym, vals[i]] } vh = Digest::SHA256.hexdigest(JSON.generate(stable_sort(value))) fact_hash = { id: SecureRandom.uuid, store: store, key: key_map[key_idx.to_i], value: value, value_hash: vh, causation: nil, transaction_time: ts_ms / 1_000.0, valid_time: valid_time, schema_version: sv } fact = Fact.from_h(fact_hash) rescue nil facts << fact if fact end end facts end |
#encode_fact(io, fact) ⇒ Object
Returns bytes written to io (0 while batch is buffered).
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/igniter/store/codecs.rb', line 136 def encode_fact(io, fact) unless @header_written @fields = (fact.value || {}).keys.map(&:to_s).sort header = { store: fact.store.to_s, fields: @fields, valid_time: fact.valid_time, schema_version: fact.schema_version } body = MessagePack.pack(stringify(header)) io.write(encode_frame(body)) @header_written = true end @batch_buf << fact @batch_buf.size >= BATCH_SIZE ? write_batch(io) : 0 end |
#flush(io) ⇒ Object
Flush any remaining buffered facts. Returns bytes written.
151 152 153 |
# File 'lib/igniter/store/codecs.rb', line 151 def flush(io) @batch_buf.empty? ? 0 : write_batch(io) end |
#name ⇒ Object
116 |
# File 'lib/igniter/store/codecs.rb', line 116 def name = NAME |
#start_segment(_io, store: nil) ⇒ Object
130 131 132 133 |
# File 'lib/igniter/store/codecs.rb', line 130 def start_segment(_io, store: nil) @store = store.to_s 0 # header is written lazily on first encode_fact call end |