Class: Igniter::Store::Protocol::Interpreter
- Inherits:
-
Object
- Object
- Igniter::Store::Protocol::Interpreter
- Defined in:
- lib/igniter/store/protocol/interpreter.rb
Constant Summary collapse
- HANDLERS =
{ store: Handlers::StoreHandler, history: Handlers::HistoryHandler, access_path: Handlers::AccessPathHandler, relation: Handlers::RelationHandler, projection: Handlers::ProjectionHandler, derivation: Handlers::DerivationHandler, command: Handlers::CommandHandler, effect: Handlers::EffectHandler, subscription: Handlers::SubscriptionHandler, }.freeze
- DEFAULT_STORAGE_ALERT_THRESHOLDS =
Default thresholds for storage-level alerts in observability_snapshot. Override per-interpreter via alert_thresholds: at construction time.
{ quarantine_receipt_count: 10, storage_byte_size: 1_073_741_824 # 1 GiB }.freeze
Instance Method Summary collapse
-
#append(history:, event:, key: nil, partition_key: nil, schema_version: 1, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object
Append an event to a history.
-
#causation_chain(store:, key:) ⇒ Object
Read-only provenance: compact causation chain for one store/key.
-
#compaction_activity(store: nil, kind: nil, since: nil, limit: nil) ⇒ Object
Normalized compaction lifecycle activity.
-
#descriptor_snapshot ⇒ Object
Raw descriptor-only snapshot (store/history/subscription).
-
#dispatch(envelope) ⇒ Object
OP3: convenience shorthand — dispatch one wire envelope hash.
-
#fact_ref(fact_id) ⇒ Object
Read-only provenance: compact fact reference, without fact value.
-
#initialize(store, alert_thresholds: {}) ⇒ Interpreter
constructor
A new instance of Interpreter.
-
#lineage(store:, key:) ⇒ Object
Read-only provenance: causal proof and downstream derivation metadata.
-
#metadata_snapshot ⇒ Object
OP2: unified protocol metadata snapshot.
-
#observability_snapshot ⇒ Object
Returns the canonical storage-level observability snapshot.
-
#query(store:, where: {}, order: nil, limit: nil, as_of: nil) ⇒ Object
Query facts matching all where: conditions.
-
#read(store:, key:, as_of: nil) ⇒ Object
Read the current value for a key (or nil).
-
#register(descriptor) ⇒ Object
Generic descriptor registration — dispatches by kind:.
- #register_access_path(descriptor) ⇒ Object
- #register_command(descriptor) ⇒ Object
- #register_derivation(descriptor) ⇒ Object
- #register_effect(descriptor) ⇒ Object
- #register_history(descriptor) ⇒ Object
- #register_projection(descriptor) ⇒ Object
- #register_relation(descriptor) ⇒ Object
-
#register_store(descriptor) ⇒ Object
Named registration helpers — vocabulary aliases for register.
- #register_subscription(descriptor) ⇒ Object
-
#replay(from: nil, to: nil, filter: nil) ⇒ Object
OP4: return all (or range-filtered) facts as serialized fact packets.
-
#resolve(relation_name, from:, as_of: nil) ⇒ Object
Resolve a named relation (delegates to IgniterStore#resolve).
-
#resolve_items(relation_name, from:, as_of: nil) ⇒ Object
Resolve a named relation with source keys preserved for client-side typed record reconstruction.
-
#segment_manifest(store: nil) ⇒ Object
Detailed per-segment manifest from the backend.
-
#storage_stats(store: nil) ⇒ Object
Physical storage stats from the backend (SegmentedFileBackend).
-
#sync_hub_profile(as_of: nil, cursor: nil, stores: nil) ⇒ Object
OP4: generates a SyncProfile for a cold hub or incremental update.
-
#wire ⇒ Object
OP3: returns the WireEnvelope router for this interpreter.
-
#write(store:, key:, value:, causation: nil, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object
Write a fact.
-
#write_fact(packet) ⇒ Object
Accept a full fact packet hash (kind: :fact) and write it to the store.
Constructor Details
#initialize(store, alert_thresholds: {}) ⇒ Interpreter
Returns a new instance of Interpreter.
29 30 31 32 33 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 29 def initialize(store, alert_thresholds: {}) @store = store @registry = {} # content fingerprint → Receipt (dedup) @alert_thresholds = DEFAULT_STORAGE_ALERT_THRESHOLDS.merge(alert_thresholds) end |
Instance Method Details
#append(history:, event:, key: nil, partition_key: nil, schema_version: 1, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object
Append an event to a history. Returns an append Receipt carrying the generated fact key, fact_id, and value_hash.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 83 def append(history:, event:, key: nil, partition_key: nil, schema_version: 1, valid_time: nil, term: nil, producer: nil, derivation: nil) fact = @store.append( history: history.to_sym, event: event, schema_version: schema_version, valid_time: valid_time, term: term, partition_key: partition_key&.to_sym, producer: producer, derivation: derivation ) Receipt.append_accepted(history: history.to_sym, fact: fact, requested_key: key) end |
#causation_chain(store:, key:) ⇒ Object
Read-only provenance: compact causation chain for one store/key.
164 165 166 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 164 def causation_chain(store:, key:) @store.causation_chain(store: store.to_sym, key: key) end |
#compaction_activity(store: nil, kind: nil, since: nil, limit: nil) ⇒ Object
Normalized compaction lifecycle activity.
Returns a response envelope with schema_version, generated_at, filters, activity (normalized entries), and count.
Filtering:
store: delegate to IgniterStore#compaction_activity(store:)
kind: filter entries by :kind
since: keep entries where occurred_at >= since
limit: cap result count after all other filters
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 284 def compaction_activity(store: nil, kind: nil, since: nil, limit: nil) store_sym = store&.to_sym entries = @store.compaction_activity(store: store_sym) entries = entries.select { |e| e[:kind].to_s == kind.to_s } if kind entries = entries.select { |e| e[:occurred_at] >= since.to_f } if since entries = entries.first(limit.to_i) if limit { schema_version: 1, generated_at: Time.now.iso8601(3), filters: { store: store&.to_s, kind: kind&.to_s, since: since&.to_f, limit: limit&.to_i }, activity: entries, count: entries.size } end |
#descriptor_snapshot ⇒ Object
Raw descriptor-only snapshot (store/history/subscription). Use metadata_snapshot for the full picture; this is a lower-level accessor.
236 237 238 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 236 def descriptor_snapshot @store.schema_graph.descriptor_snapshot end |
#dispatch(envelope) ⇒ Object
OP3: convenience shorthand — dispatch one wire envelope hash.
350 351 352 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 350 def dispatch(envelope) wire.dispatch(envelope) end |
#fact_ref(fact_id) ⇒ Object
Read-only provenance: compact fact reference, without fact value.
174 175 176 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 174 def fact_ref(fact_id) @store.fact_ref(fact_id) end |
#lineage(store:, key:) ⇒ Object
Read-only provenance: causal proof and downstream derivation metadata.
169 170 171 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 169 def lineage(store:, key:) @store.lineage(store: store.to_sym, key: key) end |
#metadata_snapshot ⇒ Object
OP2: unified protocol metadata snapshot. Combines raw descriptor registry (store/history/subscription), engine routing metadata (access paths), and all derived graph artifacts (relations, projections, derivations, scatters, retention) into one canonical introspection response. Used by Companion, StoreServer, visual tools, and compliance test kits.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 200 def g = @store.schema_graph ds = g.descriptor_snapshot snap = { schema_version: 1, stores: ds[:stores], histories: ds[:histories], access_paths: g., relations: g.relation_snapshot, projections: g.projection_snapshot, commands: g.command_snapshot, effects: g.effect_snapshot, derivations: g.derivation_snapshot, scatters: g.scatter_snapshot, subscriptions: ds[:subscriptions], retention: g.retention_snapshot } stats = @store.storage_stats snap[:storage] = stats if stats snap end |
#observability_snapshot ⇒ Object
Returns the canonical storage-level observability snapshot.
Canonical shape (same top-level keys at every layer; server-only fields are nil at the protocol level):
schema_version, generated_at, status, uptime_ms (nil),
metrics (nil), alerts, storage, server (nil)
storage-level alerts (quarantine_receipt_count, storage_byte_size) are checked against alert_thresholds configured at construction time.
365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 365 def observability_snapshot storage = @store.storage_stats rescue nil alerts = check_storage_alerts(storage) { schema_version: 1, generated_at: Time.now.iso8601(3), status: :ready, uptime_ms: nil, metrics: nil, alerts: alerts, storage: storage, server: nil } end |
#query(store:, where: {}, order: nil, limit: nil, as_of: nil) ⇒ Object
Query facts matching all where: conditions. Performs a latest-per-key scan; access paths provide introspection metadata but index-accelerated query planning is a future engine concern.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 135 def query(store:, where: {}, order: nil, limit: nil, as_of: nil) store_sym = store.to_sym facts = @store.history(store: store_sym, as_of: as_of) # Reduce to latest fact per key. latest = {} facts.each do |f| existing = latest[f.key] latest[f.key] = f if existing.nil? || f.transaction_time > existing.transaction_time end rows = latest.values where.each do |field, val| sym = field.to_sym rows = rows.select { |fact| fact.value[sym] == val } end rows = rows.sort_by { |fact| fact.value[order.to_sym] } if order rows = rows.first(limit) if limit rows.map { |fact| { key: fact.key, value: fact.value } } end |
#read(store:, key:, as_of: nil) ⇒ Object
Read the current value for a key (or nil).
128 129 130 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 128 def read(store:, key:, as_of: nil) @store.read(store: store.to_sym, key: key, as_of: as_of) end |
#register(descriptor) ⇒ Object
Generic descriptor registration — dispatches by kind:. Returns a Receipt with status :accepted, :rejected, or :deduplicated.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 37 def register(descriptor) descriptor = descriptor.transform_keys(&:to_sym) kind = descriptor[:kind]&.to_sym return Receipt.rejection("Missing required field: kind") unless kind handler_class = HANDLERS[kind] return Receipt.rejection("Unknown descriptor kind: #{kind.inspect}", kind: kind) unless handler_class fp = fingerprint(descriptor) return Receipt.deduplicated(kind: kind, name: descriptor[:name]&.to_sym) if @registry.key?(fp) receipt = handler_class.new(@store).call(descriptor) @registry[fp] = receipt if receipt.accepted? receipt end |
#register_access_path(descriptor) ⇒ Object
57 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 57 def register_access_path(descriptor) = register(descriptor) |
#register_command(descriptor) ⇒ Object
61 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 61 def register_command(descriptor) = register(descriptor) |
#register_derivation(descriptor) ⇒ Object
60 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 60 def register_derivation(descriptor) = register(descriptor) |
#register_effect(descriptor) ⇒ Object
62 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 62 def register_effect(descriptor) = register(descriptor) |
#register_history(descriptor) ⇒ Object
56 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 56 def register_history(descriptor) = register(descriptor) |
#register_projection(descriptor) ⇒ Object
59 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 59 def register_projection(descriptor) = register(descriptor) |
#register_relation(descriptor) ⇒ Object
58 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 58 def register_relation(descriptor) = register(descriptor) |
#register_store(descriptor) ⇒ Object
Named registration helpers — vocabulary aliases for register.
55 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 55 def register_store(descriptor) = register(descriptor) |
#register_subscription(descriptor) ⇒ Object
63 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 63 def register_subscription(descriptor) = register(descriptor) |
#replay(from: nil, to: nil, filter: nil) ⇒ Object
OP4: return all (or range-filtered) facts as serialized fact packets. Suitable for WAL replay to a cold hub or test double.
Filter forms:
{ store: :name }
{ store: :name, key: "event-key" }
{ store: :name, partition_key: :tracker_id, partition_value: "sleep" }
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 313 def replay(from: nil, to: nil, filter: nil) if filter filter = filter.transform_keys(&:to_sym) store_sym = filter[:store]&.to_sym if store_sym && filter.key?(:key) return @store.history( store: store_sym, key: filter[:key], since: from, as_of: to ).map { |f| serialize_fact(f) } end if store_sym && filter[:partition_key] && filter.key?(:partition_value) return @store.history_partition( store: store_sym, partition_key: filter[:partition_key].to_sym, partition_value: filter[:partition_value], since: from, as_of: to ).map { |f| serialize_fact(f) } end end raw_facts = @store.fact_log_all(since: from, as_of: to) raw_facts = raw_facts.select { |f| f.store == filter[:store]&.to_sym } if filter && filter[:store] raw_facts.map { |f| serialize_fact(f) } end |
#resolve(relation_name, from:, as_of: nil) ⇒ Object
Resolve a named relation (delegates to IgniterStore#resolve).
159 160 161 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 159 def resolve(relation_name, from:, as_of: nil) @store.resolve(relation_name, from: from, as_of: as_of) end |
#resolve_items(relation_name, from:, as_of: nil) ⇒ Object
Resolve a named relation with source keys preserved for client-side typed record reconstruction. The value-only #resolve API remains stable for existing protocol callers.
181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 181 def resolve_items(relation_name, from:, as_of: nil) rule = @store.schema_graph.relation_for(name: relation_name) raise ArgumentError, "No relation registered: #{relation_name.inspect}" unless rule index_entry = @store.read(store: :"__rel_#{relation_name}", key: from.to_s, as_of: as_of) return [] unless index_entry index_entry[:keys].filter_map do |key| value = @store.read(store: rule.source, key: key, as_of: as_of) { key: key, value: value } if value end end |
#segment_manifest(store: nil) ⇒ Object
Detailed per-segment manifest from the backend. Returns nil when the backend does not support it.
230 231 232 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 230 def segment_manifest(store: nil) @store.segment_manifest(store: store) end |
#storage_stats(store: nil) ⇒ Object
Physical storage stats from the backend (SegmentedFileBackend). Returns nil when the backend does not support it.
224 225 226 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 224 def storage_stats(store: nil) @store.storage_stats(store: store) end |
#sync_hub_profile(as_of: nil, cursor: nil, stores: nil) ⇒ Object
OP4: generates a SyncProfile for a cold hub or incremental update.
Full sync (cursor: nil): all facts + full descriptor snapshot Incremental (cursor: given): facts since cursor timestamp + snapshot stores: Array<Symbol> optional store filter (nil = all stores)
The returned SyncProfile#next_cursor should be persisted by the hub and sent back as cursor: on the next call to receive only new facts.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 248 def sync_hub_profile(as_of: nil, cursor: nil, stores: nil) from = cursor&.dig(:value) raw_facts = @store.fact_log_all(since: from, as_of: as_of) if stores allowed = Array(stores).map(&:to_sym).to_set raw_facts = raw_facts.select { |f| allowed.include?(f.store) } end fact_packets = raw_facts.map { |f| serialize_fact(f) } SyncProfile.new( schema_version: 1, kind: :sync_hub_profile, generated_at: Process.clock_gettime(Process::CLOCK_REALTIME), cursor: cursor, descriptors: , facts: fact_packets, retention: @store.schema_graph.retention_snapshot, compaction_receipts: compaction_receipt_summaries, compaction_activity: compaction_activity, subscription_checkpoints: {} ) end |
#wire ⇒ Object
OP3: returns the WireEnvelope router for this interpreter. Accepts process-boundary envelope hashes and returns response envelopes.
345 346 347 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 345 def wire @wire ||= WireEnvelope.new(self) end |
#write(store:, key:, value:, causation: nil, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object
Write a fact. Returns a write Receipt carrying fact_id and value_hash.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 66 def write(store:, key:, value:, causation: nil, valid_time: nil, term: nil, producer: nil, derivation: nil) fact = @store.write( store: store.to_sym, key: key, value: value, causation: causation, valid_time: valid_time, term: term, producer: producer, derivation: derivation ) Receipt.write_accepted(store: store.to_sym, key: key, fact: fact) end |
#write_fact(packet) ⇒ Object
Accept a full fact packet hash (kind: :fact) and write it to the store. Designed for wire replay, server ingestion, and protocol-native clients. Note: at: is recorded in the packet but cannot override the engine timestamp —the engine assigns monotonic timestamps on write.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/igniter/store/protocol/interpreter.rb', line 102 def write_fact(packet) packet = packet.transform_keys(&:to_sym) kind = packet[:kind]&.to_sym return Receipt.rejection("write_fact: expected kind: :fact, got #{kind.inspect}", kind: :fact) unless kind == :fact store = packet[:store] key = packet[:key] value = packet[:value] return Receipt.rejection("write_fact: missing store:", kind: :fact) unless store return Receipt.rejection("write_fact: missing key:", kind: :fact) unless key return Receipt.rejection("write_fact: missing value:", kind: :fact) unless value fact = @store.write( store: store.to_sym, key: key.to_s, value: value, causation: packet[:causation], valid_time: packet[:valid_time], term: packet[:term], producer: packet[:producer], derivation: packet[:derivation] ) Receipt.write_accepted(store: store.to_sym, key: key, fact: fact) end |