Class: Igniter::Store::IgniterStore

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/igniter_store.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend: nil, lru_cap: ReadCache::DEFAULT_LRU_CAP, changefeed: nil) ⇒ IgniterStore

Returns a new instance of IgniterStore.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/igniter/store/igniter_store.rb', line 42

def initialize(backend: nil, lru_cap: ReadCache::DEFAULT_LRU_CAP, changefeed: nil)
  @backend      = backend
  @lru_cap      = lru_cap
  @changefeed   = changefeed
  @log          = FactLog.new
  @cache        = ReadCache.new(lru_cap: lru_cap)
  @schema_graph = SchemaGraph.new
  # Materialized scope index: { [store, scope] => Set<key> }
  # Populated lazily on first query; maintained on every write thereafter.
  # Time-travel queries (as_of: non-nil) bypass the index.
  @scope_index  = {}
  @scope_mutex  = Mutex.new
  # Partition index: { [store, partition_key] => { partition_value => [fact, ...] } }
  # Populated lazily on first history_partition call; maintained on every append thereafter.
  # as_of/since filtering is applied at read time over the pre-grouped slice.
  @partition_index = {}
  @partition_mutex = Mutex.new
  # Schema coercion hooks: { store_name => callable(value, schema_version) }
  # Applied on every read path; raw facts remain immutable in the log and cache.
  @coercions = {}
  # Fact-id lookup index: { fact_id => Fact }
  # Maintained on write, append, replay, and rebuild_log!.
  # Reflects currently live facts only — dropped facts are removed after compaction.
  @fact_id_index = {}
end

Instance Attribute Details

#changefeedObject (readonly)

Returns the value of attribute changefeed.



28
29
30
# File 'lib/igniter/store/igniter_store.rb', line 28

def changefeed
  @changefeed
end

#schema_graphObject (readonly)

Returns the value of attribute schema_graph.



28
29
30
# File 'lib/igniter/store/igniter_store.rb', line 28

def schema_graph
  @schema_graph
end

Class Method Details

.open(path, lru_cap: ReadCache::DEFAULT_LRU_CAP) ⇒ Object



68
69
70
71
72
73
# File 'lib/igniter/store/igniter_store.rb', line 68

def self.open(path, lru_cap: ReadCache::DEFAULT_LRU_CAP)
  backend = FileBackend.new(path)
  store = new(backend: backend, lru_cap: lru_cap)
  backend.replay.each { |fact| store.__send__(:replay, fact) }
  store
end

Instance Method Details

#append(history:, event:, schema_version: 1, valid_time: nil, term: nil, partition_key: nil, producer: nil, derivation: nil) ⇒ Object



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/igniter/store/igniter_store.rb', line 382

def append(history:, event:, schema_version: 1, valid_time: nil, term: nil, partition_key: nil,
           producer: nil, derivation: nil)
  fact = Fact.build(
    store:          history,
    key:            SecureRandom.uuid,
    value:          event,
    schema_version: schema_version,
    valid_time:     valid_time,
    term:           term,
    producer:       producer,
    derivation:     derivation
  )
  @log.append(fact)
  @fact_id_index[fact.id] = fact
  @backend&.write_fact(fact)
  if partition_key && (pv = event[partition_key])
    idx_key = [history, partition_key]
    @partition_mutex.synchronize do
      if @partition_index.key?(idx_key)
        (@partition_index[idx_key][pv] ||= []) << fact
      end
    end
  end
  @changefeed&.emit(fact)
  fact
end

#causation_chain(store:, key:) ⇒ Object



484
485
486
487
488
489
490
491
492
493
# File 'lib/igniter/store/igniter_store.rb', line 484

def causation_chain(store:, key:)
  history(store: store, key: key).map do |fact|
    {
      id:         fact.id,
      value_hash: fact.value_hash[0, 12],
      causation:  fact.causation,
      transaction_time: fact.transaction_time
    }
  end
end

#checkpointObject

Write a snapshot of the current fact log to the backend’s snapshot file. After a checkpoint, startup replay only replays facts written since the snapshot — reducing startup cost from O(total_facts) to O(delta_facts).

No-op when the backend or log does not support snapshot (e.g. in-memory store or NATIVE FactLog without all_facts). Returns self.



542
543
544
545
546
547
# File 'lib/igniter/store/igniter_store.rb', line 542

def checkpoint
  if @backend.respond_to?(:write_snapshot) && @log.respond_to?(:all_facts)
    @backend.write_snapshot(@log.all_facts)
  end
  self
end

#closeObject



587
588
589
# File 'lib/igniter/store/igniter_store.rb', line 587

def close
  @backend&.close
end

#compact(store = nil) ⇒ Object

Run compaction for store (or all stores with registered retention policies). Returns an Array of result hashes: { store:, strategy:, dropped_count:, kept_count:, receipt_id: }. Permanent stores and stores with nothing to drop return dropped_count: 0 and receipt_id: nil.



102
103
104
105
106
107
108
109
# File 'lib/igniter/store/igniter_store.rb', line 102

def compact(store = nil)
  targets = store ? [store] : @schema_graph.retention_stores
  targets.filter_map do |s|
    policy = @schema_graph.retention_for(store: s)
    next unless policy && policy.strategy != :permanent
    compact_store(s, policy)
  end
end

#compaction_activity(store: nil) ⇒ Object

Normalized compaction activity across all executors.

Merges entries from:

:__compaction_receipts   — retention compaction (store.compact)
:__fact_prune_receipts   — exact fact-id prune (store.prune_fact_ids)
backend.purge_receipts   — segment purge (SegmentedFileBackend.purge!)

Each entry: { kind:, executor:, store:, status:, reason:, fact_count:,

receipt_id:, occurred_at: }

Boundary-specific receipts are not included here; use AvailabilityBoundaryLedger#compaction_activity for the full picture.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/igniter/store/igniter_store.rb', line 130

def compaction_activity(store: nil)
  entries = []

  compaction_receipts(store: store).each do |f|
    v = f.value
    entries << {
      kind:        :retention_compaction,
      executor:    :store_compact,
      store:       v[:compacted_store],
      status:      :ok,
      reason:      v[:strategy],
      fact_count:  v[:compacted_count].to_i,
      receipt_id:  f.id,
      occurred_at: v[:compacted_at].to_f
    }
  end

  @log.facts_for(store: :__fact_prune_receipts).each do |f|
    v = f.value
    entries << {
      kind:        :exact_prune,
      executor:    :fact_prune,
      store:       nil,
      status:      :ok,
      reason:      v[:reason],
      fact_count:  v[:pruned_count].to_i,
      receipt_id:  f.id,
      occurred_at: v[:pruned_at].to_f
    }
  end

  if @backend.respond_to?(:purge_receipts)
    @backend.purge_receipts(store: store).each do |r|
      entries << {
        kind:        :segment_purge,
        executor:    :segmented_backend,
        store:       r["store"]&.to_sym,
        status:      :ok,
        reason:      r["purge_strategy"],
        fact_count:  r["fact_count"].to_i,
        receipt_id:  r["segment_path"],
        occurred_at: r["purged_at"].to_f
      }
    end
  end

  entries.sort_by { |e| e[:occurred_at] }
end

#compaction_receipts(store: nil) ⇒ Object

Facts written by compaction runs for store (or all receipts when nil). Receipts live in the :__compaction_receipts meta-store.



113
114
115
116
# File 'lib/igniter/store/igniter_store.rb', line 113

def compaction_receipts(store: nil)
  all = @log.facts_for(store: :__compaction_receipts)
  store ? all.select { |f| f.value[:compacted_store] == store } : all
end

#fact_by_id(fact_id) ⇒ Object

Returns the exact Fact object for fact_id if it is live in the store, nil otherwise. Safe to call with nil or blank id — returns nil without raising. Does not apply coercion; returns the raw Fact as written.



556
557
558
559
# File 'lib/igniter/store/igniter_store.rb', line 556

def fact_by_id(fact_id)
  return nil if fact_id.nil? || fact_id.to_s.empty?
  @fact_id_index[fact_id]
end

#fact_countObject



549
550
551
# File 'lib/igniter/store/igniter_store.rb', line 549

def fact_count
  @log.size
end

#fact_log_all(since: nil, as_of: nil) ⇒ Object

Return all facts from the log, optionally bounded by time range. Used by the open protocol sync hub profile and replay operations. Returns [] when the native FactLog lacks all_facts support.



579
580
581
582
583
584
585
# File 'lib/igniter/store/igniter_store.rb', line 579

def fact_log_all(since: nil, as_of: nil)
  return [] unless @log.respond_to?(:all_facts)
  facts = @log.all_facts
  facts = facts.select { |f| f.transaction_time >= since } if since
  facts = facts.select { |f| f.transaction_time <= as_of  } if as_of
  facts
end

#fact_ref(fact_id) ⇒ Object

Returns compact metadata for fact_id without exposing the full value payload. Returns nil when the fact is not live.



563
564
565
566
567
568
569
570
571
572
573
574
# File 'lib/igniter/store/igniter_store.rb', line 563

def fact_ref(fact_id)
  fact = fact_by_id(fact_id)
  return nil unless fact
  {
    id:               fact.id,
    store:            fact.store,
    key:              fact.key,
    transaction_time: fact.transaction_time,
    valid_time:       fact.valid_time,
    value_hash:       fact.value_hash
  }
end

#history(store:, key: nil, since: nil, as_of: nil) ⇒ Object



456
457
458
# File 'lib/igniter/store/igniter_store.rb', line 456

def history(store:, key: nil, since: nil, as_of: nil)
  apply_coercions(store, @log.facts_for(store: store, key: key, since: since, as_of: as_of))
end

#history_partition(store:, partition_key:, partition_value:, since: nil, as_of: nil) ⇒ Object

Partition-filtered history query backed by a materialized index. First call for a (store, partition_key) pair performs a full scan and builds the index; subsequent calls are O(partition slice). as_of/since filtering is applied over the cached slice at read time.



464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/igniter/store/igniter_store.rb', line 464

def history_partition(store:, partition_key:, partition_value:, since: nil, as_of: nil)
  idx_key = [store, partition_key]
  @partition_mutex.synchronize do
    unless @partition_index.key?(idx_key)
      all_facts = @log.facts_for(store: store)
      groups    = Hash.new { |h, k| h[k] = [] }
      all_facts.each do |f|
        pv = f.value[partition_key]
        groups[pv] << f if pv
      end
      @partition_index[idx_key] = groups
    end

    slice = (@partition_index[idx_key][partition_value] || []).dup
    slice = slice.select { |f| f.transaction_time >= since } if since
    slice = slice.select { |f| f.transaction_time <= as_of } if as_of
    apply_coercions(store, slice)
  end
end

#lineage(store:, key:) ⇒ Object

Returns a causal proof for the given store/key: the full fact chain in chronological order, any registered derivation rules triggered by this store, and a Merkle proof hash over the chain.

proof_hash: SHA256 of “id:value_hash:causation” entries joined by “|”. Stable for the same chain; changes when any fact is added. nil when the chain is empty (key unknown).

derived_by: derivation rules registered for this store — what downstream stores are affected by writes here.



505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# File 'lib/igniter/store/igniter_store.rb', line 505

def lineage(store:, key:)
  chain = @log.facts_for(store: store, key: key).map do |fact|
    {
      id:               fact.id,
      store:            fact.store,
      key:              fact.key,
      causation:        fact.causation,
      value_hash:       fact.value_hash,
      transaction_time: fact.transaction_time,
      valid_time:       fact.valid_time,
      schema_version:   fact.schema_version
    }
  end

  derived_by = @schema_graph.derivations_for_store(store: store).map do |rule|
    {
      target_store:   rule.target_store,
      target_key:     rule.target_key.respond_to?(:call) ? :callable : rule.target_key,
      source_filters: rule.source_filters
    }
  end

  {
    subject:    { store: store, key: key },
    chain:      chain,
    depth:      chain.size,
    derived_by: derived_by,
    proof_hash: chain.empty? ? nil : lineage_proof_hash(chain)
  }
end

#protocolObject

Returns a Protocol::Interpreter wrapping this store. External / non-Igniter clients use this surface to register descriptors, write facts, and query via the open protocol vocabulary.



33
34
35
# File 'lib/igniter/store/igniter_store.rb', line 33

def protocol
  @protocol ||= Protocol::Interpreter.new(self)
end

#prune_fact_ids(fact_ids:, reason:, metadata: {}, receipt_store: :__fact_prune_receipts) ⇒ Object

Removes exact facts by id from the live FactLog and all derived indexes.

Requires a backend that supports replace_with_snapshot! (i.e. FileBackend in the Ruby-path proof). Returns { status: :unsupported } for backends that do not support durable fact removal (e.g. SegmentedFileBackend). In-memory stores (backend: nil) support the operation without durability.

Order of operations:

1. Write a prune receipt (compact refs, no full payloads) — survives the prune.
2. Rebuild log without the pruned facts.
3. Call backend.replace_with_snapshot! so dropped facts cannot resurface on reopen.

Missing fact ids are reported in the result but are not fatal.

Returns:

{ status: :ok, receipt_id:, pruned_count:, missing_count:,
  pruned_fact_refs:, missing_ids: }
{ status: :unsupported, reason: :backend_does_not_support_exact_prune, backend: }


197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/igniter/store/igniter_store.rb', line 197

def prune_fact_ids(fact_ids:, reason:, metadata: {}, receipt_store: :__fact_prune_receipts)
  if @backend && !@backend.respond_to?(:replace_with_snapshot!)
    return {
      status:  :unsupported,
      reason:  :backend_does_not_support_exact_prune,
      backend: @backend.class.name
    }
  end

  ids_set = Set.new(fact_ids.map(&:to_s))

  pruned_refs = []
  missing_ids = []
  ids_set.each do |id|
    fact = @fact_id_index[id]
    if fact
      pruned_refs << {
        id:               fact.id,
        store:            fact.store,
        key:              fact.key,
        transaction_time: fact.transaction_time,
        valid_time:       fact.valid_time,
        value_hash:       fact.value_hash
      }
    else
      missing_ids << id
    end
  end

  now     = Process.clock_gettime(Process::CLOCK_REALTIME)
  receipt = write(
    store: receipt_store,
    key:   SecureRandom.hex(8),
    value: {
      type:             :fact_prune_receipt,
      reason:           reason,
      requested_count:  ids_set.size,
      pruned_count:     pruned_refs.size,
      missing_count:    missing_ids.size,
      pruned_fact_refs: pruned_refs,
      missing_ids:      missing_ids,
      metadata:         ,
      pruned_at:        now
    }
  )

  surviving = @log.all_facts.reject { |f| ids_set.include?(f.id.to_s) }
  rebuild_log!(surviving)

  @backend.replace_with_snapshot!(@log.all_facts) if @backend

  {
    status:           :ok,
    receipt_id:       receipt.id,
    pruned_count:     pruned_refs.size,
    missing_count:    missing_ids.size,
    pruned_fact_refs: pruned_refs,
    missing_ids:      missing_ids
  }
end

#query(store:, scope:, as_of: nil, ttl: nil) ⇒ Object

Raises:

  • (ArgumentError)


424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/igniter/store/igniter_store.rb', line 424

def query(store:, scope:, as_of: nil, ttl: nil)
  path = @schema_graph.path_for(store: store, scope: scope)
  raise ArgumentError, "No registered path for store=#{store.inspect} scope=#{scope.inspect}" unless path

  effective_ttl = ttl || path.cache_ttl
  cached = @cache.get_scope(store: store, scope: scope, as_of: as_of, ttl: effective_ttl)
  return apply_coercions(store, cached) if cached

  filters = path.filters || {}
  facts = if as_of
    # Time-travel: bypass scope index — the index reflects current state only.
    @log.query_scope(store: store, filters: filters, as_of: as_of)
  else
    scope_key = [store, scope]
    idx = @scope_mutex.synchronize { @scope_index[scope_key] }
    if idx
      # Index is warm: O(matched_keys) read instead of O(all_keys) scan.
      idx.filter_map { |k| @log.latest_for(store: store, key: k) }
    else
      # First query for this scope: full scan + build index.
      all_facts = @log.query_scope(store: store, filters: filters, as_of: nil)
      @scope_mutex.synchronize do
        @scope_index[scope_key] ||= Set.new(all_facts.map(&:key))
      end
      all_facts
    end
  end

  @cache.put_scope(store: store, scope: scope, facts: facts, as_of: as_of)
  apply_coercions(store, facts)
end

#read(store:, key:, as_of: nil, ttl: nil) ⇒ Object



409
410
411
412
413
414
415
416
417
418
# File 'lib/igniter/store/igniter_store.rb', line 409

def read(store:, key:, as_of: nil, ttl: nil)
  cached = @cache.get(store: store, key: key, as_of: as_of, ttl: ttl)
  return coerce_value(store, cached) if cached

  fact = @log.latest_for(store: store, key: key, as_of: as_of)
  return nil unless fact

  @cache.put(store: store, key: key, fact: fact, as_of: as_of)
  coerce_value(store, fact)
end

#register_coercion(store_name, &block) ⇒ Object

Register a schema migration hook for store_name. The block receives (value, schema_version) and must return the migrated value. Applied on every read (point reads, scope queries, history); raw facts are never mutated — coercion is a read-path transform only.



350
351
352
353
# File 'lib/igniter/store/igniter_store.rb', line 350

def register_coercion(store_name, &block)
  @coercions[store_name] = block
  self
end

#register_derivation(source_store:, source_filters: {}, target_store:, target_key:, rule:) ⇒ Object



321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/igniter/store/igniter_store.rb', line 321

def register_derivation(source_store:, source_filters: {}, target_store:, target_key:, rule:)
  @schema_graph.register_derivation(
    DerivationRule.new(
      source_store:   source_store,
      source_filters: source_filters,
      target_store:   target_store,
      target_key:     target_key,
      rule:           rule
    )
  )
  self
end

#register_descriptor(packet) ⇒ Object

Convenience shorthand: register a protocol descriptor packet.



38
39
40
# File 'lib/igniter/store/igniter_store.rb', line 38

def register_descriptor(packet)
  protocol.register(packet)
end

#register_path(path) ⇒ Object



334
335
336
337
338
339
340
341
342
343
344
# File 'lib/igniter/store/igniter_store.rb', line 334

def register_path(path)
  @schema_graph.register(path)
  path.consumers.to_a.each do |consumer|
    if path.scope
      @cache.register_scope_consumer(path.store, path.scope, consumer)
    else
      @cache.register_consumer(path.store, consumer)
    end
  end
  self
end

#register_projection(projection_path) ⇒ Object



75
76
77
78
# File 'lib/igniter/store/igniter_store.rb', line 75

def register_projection(projection_path)
  @schema_graph.register_projection(projection_path)
  self
end

#register_relation(name, source:, partition:, target:) ⇒ Object

Declare a named cross-store relation backed by a materialized scatter index.

When any fact is written to source, the value of partition in that fact is used as a key into the index store :“_rel<name>”. The index entry accumulates the unique source keys that share that partition value.

resolve(name, from: value) reads the index and returns the current values of all matching source facts (latest per key).

This is a 1-N relation: one partition_key value → many source keys. The index is append-only (G-Set): facts are never removed from the index.



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/igniter/store/igniter_store.rb', line 269

def register_relation(name, source:, partition:, target:)
  rule = RelationRule.new(name: name, source: source, partition: partition, target: target)
  @schema_graph.register_relation(rule)

  index_store = :"__rel_#{name}"
  register_scatter(
    source_store: source,
    partition_by: partition,
    target_store: index_store,
    rule: lambda { |partition_key, existing, new_fact|
      keys = existing ? existing[:keys].dup : []
      keys << new_fact.key unless keys.include?(new_fact.key)
      { keys: keys, count: keys.size, partition_key: partition_key }
    }
  )
  self
end

#register_scatter(source_store:, partition_by:, target_store:, rule:) ⇒ Object

Register a scatter derivation rule. When a fact is written to source_store, the value of partition_by in that fact’s value is extracted as the target key. rule is called as:

rule.(partition_key, existing_value, new_fact) → Hash | nil

Returning nil skips the write. Scatter writes do not re-trigger scatter (cycle-safe via a separate thread-local guard).



309
310
311
312
313
314
315
316
317
318
319
# File 'lib/igniter/store/igniter_store.rb', line 309

def register_scatter(source_store:, partition_by:, target_store:, rule:)
  @schema_graph.register_scatter(
    ScatterRule.new(
      source_store: source_store,
      partition_by: partition_by,
      target_store: target_store,
      rule:         rule
    )
  )
  self
end

#resolve(relation_name, from:, as_of: nil) ⇒ Object

Resolve a named relation for a given partition value. Returns an Array of values of all source facts whose partition field equals from. Returns [] when nothing is indexed yet.

as_of: Float timestamp — when given, reads the index state AND each source value at that point in time (consistent point-in-time snapshot).

Raises:

  • (ArgumentError)


293
294
295
296
297
298
299
300
301
# File 'lib/igniter/store/igniter_store.rb', line 293

def resolve(relation_name, from:, as_of: nil)
  rule = @schema_graph.relation_for(name: relation_name)
  raise ArgumentError, "No relation registered: #{relation_name.inspect}" unless rule

  index_entry = read(store: :"__rel_#{relation_name}", key: from.to_s, as_of: as_of)
  return [] unless index_entry

  index_entry[:keys].filter_map { |key| read(store: rule.source, key: key, as_of: as_of) }
end

#segment_manifest(store: nil) ⇒ Object



599
600
601
602
# File 'lib/igniter/store/igniter_store.rb', line 599

def segment_manifest(store: nil)
  return nil unless @backend.respond_to?(:segment_manifest)
  @backend.segment_manifest(store: store)
end

#set_retention(store, strategy:, duration: nil) ⇒ Object

Register a reactive derivation rule. When any fact is written to source_store (filtered by source_filters), rule is called with the current source facts and the result is written to target_store at target_key. Returning nil from rule skips the write. Derivations do not re-trigger on derived writes (cycle-safe). Declare a retention policy for store. strategy: :permanent (default — never compact)

:ephemeral — keep only the latest fact per key
:rolling_window — keep latest per key + facts within duration seconds

Call compact or compact(store) to execute the policy.



90
91
92
93
94
95
96
# File 'lib/igniter/store/igniter_store.rb', line 90

def set_retention(store, strategy:, duration: nil)
  @schema_graph.register_retention(
    store,
    RetentionPolicy.new(strategy: strategy, duration: duration)
  )
  self
end

#storage_stats(store: nil) ⇒ Object

Returns storage metadata from the backend when it supports it. Delegates to SegmentedFileBackend#storage_stats or returns nil for backends that do not expose storage metadata (in-memory, FileBackend).



594
595
596
597
# File 'lib/igniter/store/igniter_store.rb', line 594

def storage_stats(store: nil)
  return nil unless @backend.respond_to?(:storage_stats)
  @backend.storage_stats(store: store)
end

#time_travel(store:, key:, at:) ⇒ Object



420
421
422
# File 'lib/igniter/store/igniter_store.rb', line 420

def time_travel(store:, key:, at:)
  read(store: store, key: key, as_of: at)
end

#write(store:, key:, value:, schema_version: 1, causation: nil, valid_time: nil, term: nil, producer: nil, derivation: nil) ⇒ Object



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/igniter/store/igniter_store.rb', line 355

def write(store:, key:, value:, schema_version: 1, causation: nil, valid_time: nil, term: nil,
          producer: nil, derivation: nil)
  previous = @log.latest_for(store: store, key: key)
  fact = Fact.build(
    store:          store,
    key:            key,
    value:          value,
    causation:      causation || previous&.id,
    schema_version: schema_version,
    valid_time:     valid_time,
    term:           term,
    producer:       producer,
    derivation:     derivation
  )
  @log.append(fact)
  @fact_id_index[fact.id] = fact
  @backend&.write_fact(fact)
  scope_changes = update_scope_indices(store, key, value)
  @cache.invalidate(store: store, key: key, scope_changes: scope_changes)
  # Emit source fact before derived/scatter writes so subscribers see
  # cause before effects (source-first emission order).
  @changefeed&.emit(fact)
  run_derivations(store: store, source_fact: fact)
  run_scatters(store: store, source_fact: fact)
  fact
end