Class: Textus::Ports::ProduceOnWriteSubscriber
- Inherits:
-
Object
- Object
- Textus::Ports::ProduceOnWriteSubscriber
- Defined in:
- lib/textus/ports/produce_on_write_subscriber.rb
Overview
ADR 0093 / job-queue model: on a canon write, enqueue a ‘materialize` job for each derived entry that depends on the written key (rdeps ∩ producible). Async-only — the write returns immediately; a worker (drain/serve) converges the jobs. There is no inline `sync` path and no in-process thread: freshness is re-homed to drain (at the commit/CI gate) and the daemon. A write INTO a derived entry does not fan out (recursion guard). Produce self-elevates, so the job is stamped automation. Attached at Store boot, alongside AuditSubscriber.
Instance Method Summary collapse
- #attach(bus) ⇒ Object
-
#initialize(container) ⇒ ProduceOnWriteSubscriber
constructor
A new instance of ProduceOnWriteSubscriber.
- #on_write(key:) ⇒ Object
Constructor Details
#initialize(container) ⇒ ProduceOnWriteSubscriber
Returns a new instance of ProduceOnWriteSubscriber.
14 15 16 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 14 def initialize(container) @container = container end |
Instance Method Details
#attach(bus) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 18 def attach(bus) bus.on(:entry_written, :produce_on_write) do |key:, **| on_write(key: key) end # Closes the ADR 0087 gap: a delete/rename of a source must re-materialize # its orphaned dependents too, not just a write. These fire distinct # events (:entry_deleted / :entry_renamed), so subscribe to each. bus.on(:entry_deleted, :produce_on_delete) do |key:, **| on_write(key: key) end bus.on(:entry_renamed, :produce_on_rename) do |from_key:, to_key:, **| on_write(key: from_key) on_write(key: to_key) end self end |
#on_write(key:) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 35 def on_write(key:) return if derived_write?(key) # recursion guard: produce output is not a source change affected = Textus::Read::Rdeps.new(container: @container).call(key)["rdeps"] producible = affected.select { |k| producible?(k) } return if producible.empty? queue = Textus::Ports::Queue.new(root: @container.root) producible.each do |k| queue.enqueue( Textus::Domain::Jobs::Job.new( type: "materialize", args: { "key" => k }, enqueued_by: Textus::Role::AUTOMATION, ), ) end end |