Class: Textus::Ports::ProduceOnWriteSubscriber
- Inherits:
-
Object
- Object
- Textus::Ports::ProduceOnWriteSubscriber
- Defined in:
- lib/textus/ports/produce_on_write_subscriber.rb
Overview
ADR 0093: on a canon write, converge the derived entries that depend on the written key (rdeps ∩ derived) by running Produce — scoped + non-destructive. This IS reconcile narrowed to a write’s blast radius; there is no separate “reactive materialize” subsystem. Per-entry source.on_write (sync|async) picks inline-under-lock vs deferred. A write INTO a derived entry does not fan out (recursion guard). Failures never reach the writer (Produce.converge isolates them). Attached at Store boot, alongside AuditSubscriber.
Instance Method Summary collapse
- #attach(bus) ⇒ Object
-
#initialize(container) ⇒ ProduceOnWriteSubscriber
constructor
A new instance of ProduceOnWriteSubscriber.
- #on_write(key:, call:) ⇒ Object
Constructor Details
#initialize(container) ⇒ ProduceOnWriteSubscriber
Returns a new instance of ProduceOnWriteSubscriber.
13 14 15 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 13 def initialize(container) @container = container end |
Instance Method Details
#attach(bus) ⇒ Object
17 18 19 20 21 22 23 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 17 def attach(bus) bus.on(:entry_written, :produce_on_write) do |ctx:, key:, **| call = Textus::Call.build(role: ctx.role, correlation_id: ctx.correlation_id) on_write(key: key, call: call) end self end |
#on_write(key:, call:) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 25 def on_write(key:, call:) return if derived_write?(key) # recursion guard: produce output is not a source change affected = Textus::Read::Rdeps.new(container: @container).call(key)["rdeps"] producible = affected.select { |k| producible?(k) } return if producible.empty? if any_sync?(producible) Textus::Produce::Engine.converge(container: @container, call: call, keys: producible) else Textus::Produce::Engine::AsyncRunner.enqueue(container: @container, call: call, keys: producible) end end |