Class: Igniter::Store::Codecs::CompactDelta

Inherits:
Object
  • Object
show all
Includes:
WireProtocol
Defined in:
lib/igniter/store/codecs.rb

Overview

── CompactDelta ────────────────────────────────────────────────────────

Structural compression optimised for high-frequency History stores (sensor readings, GPS tracks, telemetry).

What is removed vs the full Fact representation:

id            → not stored; synthetic id assigned on decode
store         → in segment header (once per segment)
value_hash    → not stored; recomputed from value on decode
causation     → always nil for History — omitted
term          → in segment header
schema_version→ in segment header
value keys    → field index from per-segment dictionary (header frame)
key string    → per-segment key dictionary index (delta updates per batch)
timestamp     → absolute ms for first entry; signed delta-ms thereafter

Segment layout (all frames use WireProtocol CRC32 framing):

[header_frame]  MessagePack { store, fields:[...], term, schema_version }
[batch_frame]   MessagePack { km:{idx=>key,...}, e:[[ki,Δms,[v0,v1…]],…] }
                compressed with Zlib before framing
...

The key map (km) in each batch carries only NEW keys added since the previous batch, so readers accumulate it incrementally.

Benchmark result (GPS stream, 5 k facts):

json_crc32   → 380 bytes/fact
compact_delta→  23 bytes/fact   (16x smaller)

Limitation (native mode): decoded Facts receive synthetic ids and, in the Rust native extension, timestamps are reset to Time.now — the same known gap as json_crc32 in native mode. Pure-Ruby mode restores timestamps correctly.

Constant Summary collapse

NAME =
"compact_delta_zlib"
BATCH_SIZE =
64

Constants included from WireProtocol

WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE

Instance Method Summary collapse

Methods included from WireProtocol

#encode_frame, #read_frame

Constructor Details

#initializeCompactDelta

── Write side ──────────────────────────────────────────────────────



120
121
122
123
124
125
126
127
128
# File 'lib/igniter/store/codecs.rb', line 120

def initialize
  @fields          = nil
  @key_map         = {}   # key_string → Integer index
  @km_flushed      = 0    # keys already sent to disk
  @last_ts_ms      = nil
  @batch_buf       = []
  @header_written  = false
  @store           = nil
end

Instance Method Details

#buffered_countObject



155
# File 'lib/igniter/store/codecs.rb', line 155

def buffered_count = @batch_buf.size

#decode(io) ⇒ Object

── Read side ────────────────────────────────────────────────────────



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/igniter/store/codecs.rb', line 159

def decode(io)
  header_body = read_frame(io)
  return [] unless header_body
  header = MessagePack.unpack(header_body)
  fields = header["fields"] || []
  store      = (header["store"] || "").to_sym
  valid_time = (header["valid_time"] || header["term"])&.to_f
  sv         = (header["schema_version"] || 1).to_i

  key_map    = {}    # Integer index → key_string
  last_ts_ms = nil
  facts      = []

  while (body = read_frame(io))
    _count = body[0, 4].unpack1("N")
    raw    = Zlib::Inflate.inflate(body[4..])
    batch  = MessagePack.unpack(raw)

    (batch["km"] || {}).each { |idx, key| key_map[idx.to_i] = key }

    (batch["e"] || []).each do |key_idx, delta_ms, vals|
      ts_ms      = last_ts_ms ? last_ts_ms + delta_ms : delta_ms
      last_ts_ms = ts_ms

      value = fields.each_with_index.to_h { |fn, i| [fn.to_sym, vals[i]] }
      vh    = Digest::SHA256.hexdigest(JSON.generate(stable_sort(value)))

      fact_hash = {
        id:               SecureRandom.uuid,
        store:            store,
        key:              key_map[key_idx.to_i],
        value:            value,
        value_hash:       vh,
        causation:        nil,
        transaction_time: ts_ms / 1_000.0,
        valid_time:       valid_time,
        schema_version:   sv
      }
      fact = Fact.from_h(fact_hash) rescue nil
      facts << fact if fact
    end
  end

  facts
end

#encode_fact(io, fact) ⇒ Object

Returns bytes written to io (0 while batch is buffered).



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/igniter/store/codecs.rb', line 136

def encode_fact(io, fact)
  unless @header_written
    @fields = (fact.value || {}).keys.map(&:to_s).sort
    header  = { store: fact.store.to_s, fields: @fields,
                valid_time: fact.valid_time, schema_version: fact.schema_version }
    body = MessagePack.pack(stringify(header))
    io.write(encode_frame(body))
    @header_written = true
  end

  @batch_buf << fact
  @batch_buf.size >= BATCH_SIZE ? write_batch(io) : 0
end

#flush(io) ⇒ Object

Flush any remaining buffered facts. Returns bytes written.



151
152
153
# File 'lib/igniter/store/codecs.rb', line 151

def flush(io)
  @batch_buf.empty? ? 0 : write_batch(io)
end

#nameObject



116
# File 'lib/igniter/store/codecs.rb', line 116

def name = NAME

#start_segment(_io, store: nil) ⇒ Object



130
131
132
133
# File 'lib/igniter/store/codecs.rb', line 130

def start_segment(_io, store: nil)
  @store = store.to_s
  0  # header is written lazily on first encode_fact call
end