Class: Textus::Ports::ProduceOnWriteSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/ports/produce_on_write_subscriber.rb

Overview

ADR 0093 / job-queue model: on a canon write, enqueue a ‘materialize` job for each derived entry that depends on the written key (rdeps ∩ producible). Async-only — the write returns immediately; a worker (drain/serve) converges the jobs. There is no inline `sync` path and no in-process thread: freshness is re-homed to drain (at the commit/CI gate) and the daemon. A write INTO a derived entry does not fan out (recursion guard). Produce self-elevates, so the job is stamped automation. Attached at Store boot, alongside AuditSubscriber.

Instance Method Summary collapse

Constructor Details

#initialize(container) ⇒ ProduceOnWriteSubscriber

Returns a new instance of ProduceOnWriteSubscriber.



14
15
16
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 14

def initialize(container)
  @container = container
end

Instance Method Details

#attach(bus) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 18

def attach(bus)
  bus.on(:entry_written, :produce_on_write) do |key:, **|
    on_write(key: key)
  end
  # Closes the ADR 0087 gap: a delete/rename of a source must re-materialize
  # its orphaned dependents too, not just a write. These fire distinct
  # events (:entry_deleted / :entry_renamed), so subscribe to each.
  bus.on(:entry_deleted, :produce_on_delete) do |key:, **|
    on_write(key: key)
  end
  bus.on(:entry_renamed, :produce_on_rename) do |from_key:, to_key:, **|
    on_write(key: from_key)
    on_write(key: to_key)
  end
  self
end

#on_write(key:) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/textus/ports/produce_on_write_subscriber.rb', line 35

def on_write(key:)
  return if derived_write?(key) # recursion guard: produce output is not a source change

  affected = Textus::Read::Rdeps.new(container: @container).call(key)["rdeps"]
  producible = affected.select { |k| producible?(k) }
  return if producible.empty?

  queue = Textus::Ports::Queue.new(root: @container.root)
  producible.each do |k|
    queue.enqueue(
      Textus::Domain::Jobs::Job.new(
        type: "materialize", args: { "key" => k }, enqueued_by: Textus::Role::AUTOMATION,
      ),
    )
  end
end