Class: Textus::Ports::ProduceOnWriteSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/ports/produce_on_write_subscriber.rb

Overview

ADR 0093: on a canon write, converge the derived entries that depend on the written key (rdeps ∩ derived) by running Produce — scoped + non-destructive. This IS reconcile narrowed to a write’s blast radius; there is no separate “reactive materialize” subsystem. Per-entry source.on_write (sync|async) picks inline-under-lock vs deferred. A write INTO a derived entry does not fan out (recursion guard). Failures never reach the writer (Produce.converge isolates them). Attached at Store boot, alongside AuditSubscriber.

Instance Method Summary collapse

Constructor Details

#initialize(container) ⇒ ProduceOnWriteSubscriber

Returns a new instance of ProduceOnWriteSubscriber.



13
14
15
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 13

def initialize(container)
  @container = container
end

Instance Method Details

#attach(bus) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 17

def attach(bus)
  bus.on(:entry_written, :produce_on_write) do |ctx:, key:, **|
    call = Textus::Call.build(role: ctx.role, correlation_id: ctx.correlation_id)
    on_write(key: key, call: call)
  end
  self
end

#on_write(key:, call:) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 25

def on_write(key:, call:)
  return if derived_write?(key) # recursion guard: produce output is not a source change

  affected = Textus::Read::Rdeps.new(container: @container).call(key)["rdeps"]
  producible = affected.select { |k| producible?(k) }
  return if producible.empty?

  if any_sync?(producible)
    Textus::Produce::Engine.converge(container: @container, call: call, keys: producible)
  else
    Textus::Produce::Engine::AsyncRunner.enqueue(container: @container, call: call, keys: producible)
  end
end