Class: Igniter::Store::Protocol::Interpreter

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/protocol/interpreter.rb

Constant Summary collapse

HANDLERS =
{
  store:        Handlers::StoreHandler,
  history:      Handlers::HistoryHandler,
  access_path:  Handlers::AccessPathHandler,
  relation:     Handlers::RelationHandler,
  projection:   Handlers::ProjectionHandler,
  derivation:   Handlers::DerivationHandler,
  command:      Handlers::CommandHandler,
  effect:       Handlers::EffectHandler,
  subscription: Handlers::SubscriptionHandler,
}.freeze
DEFAULT_STORAGE_ALERT_THRESHOLDS =

Default thresholds for storage-level alerts in observability_snapshot. Override per-interpreter via alert_thresholds: at construction time.

{
  quarantine_receipt_count: 10,
  storage_byte_size:        1_073_741_824   # 1 GiB
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(store, alert_thresholds: {}) ⇒ Interpreter

Returns a new instance of Interpreter.



29
30
31
32
33
# File 'lib/igniter/store/protocol/interpreter.rb', line 29

def initialize(store, alert_thresholds: {})
  @store            = store
  @registry         = {}  # content fingerprint → Receipt (dedup)
  @alert_thresholds = DEFAULT_STORAGE_ALERT_THRESHOLDS.merge(alert_thresholds)
end

Instance Method Details

#append(history:, event:, key: nil, partition_key: nil, schema_version: 1, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object

Append an event to a history. Returns an append Receipt carrying the generated fact key, fact_id, and value_hash.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/igniter/store/protocol/interpreter.rb', line 83

def append(history:, event:, key: nil, partition_key: nil, schema_version: 1,
           valid_time: nil, term: nil, producer: nil, derivation: nil)
  fact = @store.append(
    history:        history.to_sym,
    event:          event,
    schema_version: schema_version,
    valid_time:     valid_time,
    term:           term,
    partition_key:  partition_key&.to_sym,
    producer:       producer,
    derivation:     derivation
  )
  Receipt.append_accepted(history: history.to_sym, fact: fact, requested_key: key)
end

#causation_chain(store:, key:) ⇒ Object

Read-only provenance: compact causation chain for one store/key.



164
165
166
# File 'lib/igniter/store/protocol/interpreter.rb', line 164

def causation_chain(store:, key:)
  @store.causation_chain(store: store.to_sym, key: key)
end

#compaction_activity(store: nil, kind: nil, since: nil, limit: nil) ⇒ Object

Normalized compaction lifecycle activity.

Returns a response envelope with schema_version, generated_at, filters, activity (normalized entries), and count.

Filtering:

store:  delegate to IgniterStore#compaction_activity(store:)
kind:   filter entries by :kind
since:  keep entries where occurred_at >= since
limit:  cap result count after all other filters


284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/igniter/store/protocol/interpreter.rb', line 284

def compaction_activity(store: nil, kind: nil, since: nil, limit: nil)
  store_sym = store&.to_sym
  entries = @store.compaction_activity(store: store_sym)

  entries = entries.select { |e| e[:kind].to_s == kind.to_s } if kind
  entries = entries.select { |e| e[:occurred_at] >= since.to_f } if since
  entries = entries.first(limit.to_i) if limit

  {
    schema_version: 1,
    generated_at:   Time.now.iso8601(3),
    filters: {
      store: store&.to_s,
      kind:  kind&.to_s,
      since: since&.to_f,
      limit: limit&.to_i
    },
    activity: entries,
    count:    entries.size
  }
end

#descriptor_snapshotObject

Raw descriptor-only snapshot (store/history/subscription). Use metadata_snapshot for the full picture; this is a lower-level accessor.



236
237
238
# File 'lib/igniter/store/protocol/interpreter.rb', line 236

def descriptor_snapshot
  @store.schema_graph.descriptor_snapshot
end

#dispatch(envelope) ⇒ Object

OP3: convenience shorthand — dispatch one wire envelope hash.



350
351
352
# File 'lib/igniter/store/protocol/interpreter.rb', line 350

def dispatch(envelope)
  wire.dispatch(envelope)
end

#fact_ref(fact_id) ⇒ Object

Read-only provenance: compact fact reference, without fact value.



174
175
176
# File 'lib/igniter/store/protocol/interpreter.rb', line 174

def fact_ref(fact_id)
  @store.fact_ref(fact_id)
end

#lineage(store:, key:) ⇒ Object

Read-only provenance: causal proof and downstream derivation metadata.



169
170
171
# File 'lib/igniter/store/protocol/interpreter.rb', line 169

def lineage(store:, key:)
  @store.lineage(store: store.to_sym, key: key)
end

#metadata_snapshotObject

OP2: unified protocol metadata snapshot. Combines raw descriptor registry (store/history/subscription), engine routing metadata (access paths), and all derived graph artifacts (relations, projections, derivations, scatters, retention) into one canonical introspection response. Used by Companion, StoreServer, visual tools, and compliance test kits.



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/igniter/store/protocol/interpreter.rb', line 200

def 
  g = @store.schema_graph
  ds = g.descriptor_snapshot
  snap = {
    schema_version: 1,
    stores:        ds[:stores],
    histories:     ds[:histories],
    access_paths:  g.,
    relations:     g.relation_snapshot,
    projections:   g.projection_snapshot,
    commands:      g.command_snapshot,
    effects:       g.effect_snapshot,
    derivations:   g.derivation_snapshot,
    scatters:      g.scatter_snapshot,
    subscriptions: ds[:subscriptions],
    retention:     g.retention_snapshot
  }
  stats = @store.storage_stats
  snap[:storage] = stats if stats
  snap
end

#observability_snapshotObject

Returns the canonical storage-level observability snapshot.

Canonical shape (same top-level keys at every layer; server-only fields are nil at the protocol level):

schema_version, generated_at, status, uptime_ms (nil),
metrics (nil), alerts, storage, server (nil)

storage-level alerts (quarantine_receipt_count, storage_byte_size) are checked against alert_thresholds configured at construction time.



365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/igniter/store/protocol/interpreter.rb', line 365

def observability_snapshot
  storage = @store.storage_stats rescue nil
  alerts  = check_storage_alerts(storage)
  {
    schema_version: 1,
    generated_at:   Time.now.iso8601(3),
    status:         :ready,
    uptime_ms:      nil,
    metrics:        nil,
    alerts:         alerts,
    storage:        storage,
    server:         nil
  }
end

#query(store:, where: {}, order: nil, limit: nil, as_of: nil) ⇒ Object

Query facts matching all where: conditions. Performs a latest-per-key scan; access paths provide introspection metadata but index-accelerated query planning is a future engine concern.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/igniter/store/protocol/interpreter.rb', line 135

def query(store:, where: {}, order: nil, limit: nil, as_of: nil)
  store_sym = store.to_sym
  facts = @store.history(store: store_sym, as_of: as_of)

  # Reduce to latest fact per key.
  latest = {}
  facts.each do |f|
    existing = latest[f.key]
    latest[f.key] = f if existing.nil? || f.transaction_time > existing.transaction_time
  end

  rows = latest.values

  where.each do |field, val|
    sym = field.to_sym
    rows = rows.select { |fact| fact.value[sym] == val }
  end

  rows = rows.sort_by { |fact| fact.value[order.to_sym] } if order
  rows = rows.first(limit) if limit
  rows.map { |fact| { key: fact.key, value: fact.value } }
end

#read(store:, key:, as_of: nil) ⇒ Object

Read the current value for a key (or nil).



128
129
130
# File 'lib/igniter/store/protocol/interpreter.rb', line 128

def read(store:, key:, as_of: nil)
  @store.read(store: store.to_sym, key: key, as_of: as_of)
end

#register(descriptor) ⇒ Object

Generic descriptor registration — dispatches by kind:. Returns a Receipt with status :accepted, :rejected, or :deduplicated.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/igniter/store/protocol/interpreter.rb', line 37

def register(descriptor)
  descriptor = descriptor.transform_keys(&:to_sym)
  kind = descriptor[:kind]&.to_sym

  return Receipt.rejection("Missing required field: kind") unless kind

  handler_class = HANDLERS[kind]
  return Receipt.rejection("Unknown descriptor kind: #{kind.inspect}", kind: kind) unless handler_class

  fp = fingerprint(descriptor)
  return Receipt.deduplicated(kind: kind, name: descriptor[:name]&.to_sym) if @registry.key?(fp)

  receipt = handler_class.new(@store).call(descriptor)
  @registry[fp] = receipt if receipt.accepted?
  receipt
end

#register_access_path(descriptor) ⇒ Object



57
# File 'lib/igniter/store/protocol/interpreter.rb', line 57

def register_access_path(descriptor)  = register(descriptor)

#register_command(descriptor) ⇒ Object



61
# File 'lib/igniter/store/protocol/interpreter.rb', line 61

def register_command(descriptor)      = register(descriptor)

#register_derivation(descriptor) ⇒ Object



60
# File 'lib/igniter/store/protocol/interpreter.rb', line 60

def register_derivation(descriptor)   = register(descriptor)

#register_effect(descriptor) ⇒ Object



62
# File 'lib/igniter/store/protocol/interpreter.rb', line 62

def register_effect(descriptor)       = register(descriptor)

#register_history(descriptor) ⇒ Object



56
# File 'lib/igniter/store/protocol/interpreter.rb', line 56

def register_history(descriptor)      = register(descriptor)

#register_projection(descriptor) ⇒ Object



59
# File 'lib/igniter/store/protocol/interpreter.rb', line 59

def register_projection(descriptor)   = register(descriptor)

#register_relation(descriptor) ⇒ Object



58
# File 'lib/igniter/store/protocol/interpreter.rb', line 58

def register_relation(descriptor)     = register(descriptor)

#register_store(descriptor) ⇒ Object

Named registration helpers — vocabulary aliases for register.



55
# File 'lib/igniter/store/protocol/interpreter.rb', line 55

def register_store(descriptor)        = register(descriptor)

#register_subscription(descriptor) ⇒ Object



63
# File 'lib/igniter/store/protocol/interpreter.rb', line 63

def register_subscription(descriptor) = register(descriptor)

#replay(from: nil, to: nil, filter: nil) ⇒ Object

OP4: return all (or range-filtered) facts as serialized fact packets. Suitable for WAL replay to a cold hub or test double.

Filter forms:

{ store: :name }
{ store: :name, key: "event-key" }
{ store: :name, partition_key: :tracker_id, partition_value: "sleep" }


313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/igniter/store/protocol/interpreter.rb', line 313

def replay(from: nil, to: nil, filter: nil)
  if filter
    filter = filter.transform_keys(&:to_sym)
    store_sym = filter[:store]&.to_sym

    if store_sym && filter.key?(:key)
      return @store.history(
        store: store_sym,
        key:   filter[:key],
        since: from,
        as_of: to
      ).map { |f| serialize_fact(f) }
    end

    if store_sym && filter[:partition_key] && filter.key?(:partition_value)
      return @store.history_partition(
        store:           store_sym,
        partition_key:   filter[:partition_key].to_sym,
        partition_value: filter[:partition_value],
        since:           from,
        as_of:           to
      ).map { |f| serialize_fact(f) }
    end
  end

  raw_facts = @store.fact_log_all(since: from, as_of: to)
  raw_facts = raw_facts.select { |f| f.store == filter[:store]&.to_sym } if filter && filter[:store]
  raw_facts.map { |f| serialize_fact(f) }
end

#resolve(relation_name, from:, as_of: nil) ⇒ Object

Resolve a named relation (delegates to IgniterStore#resolve).



159
160
161
# File 'lib/igniter/store/protocol/interpreter.rb', line 159

def resolve(relation_name, from:, as_of: nil)
  @store.resolve(relation_name, from: from, as_of: as_of)
end

#resolve_items(relation_name, from:, as_of: nil) ⇒ Object

Resolve a named relation with source keys preserved for client-side typed record reconstruction. The value-only #resolve API remains stable for existing protocol callers.

Raises:

  • (ArgumentError)


181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/igniter/store/protocol/interpreter.rb', line 181

def resolve_items(relation_name, from:, as_of: nil)
  rule = @store.schema_graph.relation_for(name: relation_name)
  raise ArgumentError, "No relation registered: #{relation_name.inspect}" unless rule

  index_entry = @store.read(store: :"__rel_#{relation_name}", key: from.to_s, as_of: as_of)
  return [] unless index_entry

  index_entry[:keys].filter_map do |key|
    value = @store.read(store: rule.source, key: key, as_of: as_of)
    { key: key, value: value } if value
  end
end

#segment_manifest(store: nil) ⇒ Object

Detailed per-segment manifest from the backend. Returns nil when the backend does not support it.



230
231
232
# File 'lib/igniter/store/protocol/interpreter.rb', line 230

def segment_manifest(store: nil)
  @store.segment_manifest(store: store)
end

#storage_stats(store: nil) ⇒ Object

Physical storage stats from the backend (SegmentedFileBackend). Returns nil when the backend does not support it.



224
225
226
# File 'lib/igniter/store/protocol/interpreter.rb', line 224

def storage_stats(store: nil)
  @store.storage_stats(store: store)
end

#sync_hub_profile(as_of: nil, cursor: nil, stores: nil) ⇒ Object

OP4: generates a SyncProfile for a cold hub or incremental update.

Full sync (cursor: nil): all facts + full descriptor snapshot Incremental (cursor: given): facts since cursor timestamp + snapshot stores: Array<Symbol> optional store filter (nil = all stores)

The returned SyncProfile#next_cursor should be persisted by the hub and sent back as cursor: on the next call to receive only new facts.



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/igniter/store/protocol/interpreter.rb', line 248

def sync_hub_profile(as_of: nil, cursor: nil, stores: nil)
  from = cursor&.dig(:value)

  raw_facts = @store.fact_log_all(since: from, as_of: as_of)

  if stores
    allowed = Array(stores).map(&:to_sym).to_set
    raw_facts = raw_facts.select { |f| allowed.include?(f.store) }
  end

  fact_packets = raw_facts.map { |f| serialize_fact(f) }

  SyncProfile.new(
    schema_version:           1,
    kind:                     :sync_hub_profile,
    generated_at:             Process.clock_gettime(Process::CLOCK_REALTIME),
    cursor:                   cursor,
    descriptors:              ,
    facts:                    fact_packets,
    retention:                @store.schema_graph.retention_snapshot,
    compaction_receipts:      compaction_receipt_summaries,
    compaction_activity:      compaction_activity,
    subscription_checkpoints: {}
  )
end

#wireObject

OP3: returns the WireEnvelope router for this interpreter. Accepts process-boundary envelope hashes and returns response envelopes.



345
346
347
# File 'lib/igniter/store/protocol/interpreter.rb', line 345

def wire
  @wire ||= WireEnvelope.new(self)
end

#write(store:, key:, value:, causation: nil, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object

Write a fact. Returns a write Receipt carrying fact_id and value_hash.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/igniter/store/protocol/interpreter.rb', line 66

def write(store:, key:, value:, causation: nil, valid_time: nil, term: nil,
          producer: nil, derivation: nil)
  fact = @store.write(
    store:      store.to_sym,
    key:        key,
    value:      value,
    causation:  causation,
    valid_time: valid_time,
    term:       term,
    producer:   producer,
    derivation: derivation
  )
  Receipt.write_accepted(store: store.to_sym, key: key, fact: fact)
end

#write_fact(packet) ⇒ Object

Accept a full fact packet hash (kind: :fact) and write it to the store. Designed for wire replay, server ingestion, and protocol-native clients. Note: at: is recorded in the packet but cannot override the engine timestamp —the engine assigns monotonic timestamps on write.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/igniter/store/protocol/interpreter.rb', line 102

def write_fact(packet)
  packet = packet.transform_keys(&:to_sym)
  kind = packet[:kind]&.to_sym
  return Receipt.rejection("write_fact: expected kind: :fact, got #{kind.inspect}", kind: :fact) unless kind == :fact

  store = packet[:store]
  key   = packet[:key]
  value = packet[:value]
  return Receipt.rejection("write_fact: missing store:",  kind: :fact) unless store
  return Receipt.rejection("write_fact: missing key:",    kind: :fact) unless key
  return Receipt.rejection("write_fact: missing value:",  kind: :fact) unless value

  fact = @store.write(
    store:      store.to_sym,
    key:        key.to_s,
    value:      value,
    causation:  packet[:causation],
    valid_time: packet[:valid_time],
    term:       packet[:term],
    producer:   packet[:producer],
    derivation: packet[:derivation]
  )
  Receipt.write_accepted(store: store.to_sym, key: key, fact: fact)
end