Module: Textus::Jobs::Handlers

Defined in:
lib/textus/jobs/handlers.rb

Overview

Wires the closed allow-list of convergence job types to the existing convergence code. Authority is read from the job’s frozen ‘enqueued_by` and turned into the Call the handler runs under: produce self-elevates inside Produce::Engine regardless; destructive sweep runs AS the caller.

Class Method Summary collapse

Class Method Details

.call_for(job) ⇒ Object



53
54
55
# File 'lib/textus/jobs/handlers.rb', line 53

def call_for(job)
  Textus::Call.build(role: job.enqueued_by || Textus::Role::AUTOMATION)
end

.produce(job:, container:) ⇒ Object

produce: render derived / re-pull intake for a single key. Engine self-elevates to the build actor internally; the passed call carries only correlation/dry_run plus the stamped role for audit. Engine#call isolates per-key produce errors into its result hash rather than raising, so surface them as :produce_failed events (the converge result hash used to carry them; the worker drops the return, so re-publish here).



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/textus/jobs/handlers.rb', line 26

def produce(job:, container:)
  call = call_for(job)
  result = Textus::Produce::Engine.converge(container: container, call: call, keys: [job.args["key"]])
  return unless result.is_a?(Hash)

  Array(result[:failed]).each do |failure|
    container.events.publish(
      :produce_failed,
      ctx: Textus::Hooks::Context.for(container: container, call: call),
      keys: [failure["key"]], error: failure["error"]
    )
  end
end

.registryObject



10
11
12
13
14
15
16
17
18
# File 'lib/textus/jobs/handlers.rb', line 10

def registry
  reg = Textus::Domain::Jobs::Registry.new
  # produce is pure (self-elevates) — any caller may request a rematerialize.
  reg.register("materialize", handler: method(:produce))
  reg.register("re-pull",     handler: method(:produce))
  # sweep is destructive — gate ad-hoc enqueue to the automation authority.
  reg.register("sweep",       handler: method(:sweep), required_role: Textus::Role::AUTOMATION)
  reg
end

.scope_prefix(scope) ⇒ Object

A scope is ‘{ “prefix” => …, “zone” => … }` or nil (whole store).



58
# File 'lib/textus/jobs/handlers.rb', line 58

def scope_prefix(scope) = scope.is_a?(Hash) ? scope["prefix"] : nil

.scope_zone(scope) ⇒ Object



59
# File 'lib/textus/jobs/handlers.rb', line 59

def scope_zone(scope)   = scope.is_a?(Hash) ? scope["zone"]   : nil

.sweep(job:, container:) ⇒ Object

sweep: compute retention rows for the scope, then apply destructively AS the job’s role (no self-elevation).



42
43
44
45
46
47
48
49
50
51
# File 'lib/textus/jobs/handlers.rb', line 42

def sweep(job:, container:)
  call = call_for(job)
  scope = job.args["scope"]
  rows = Textus::Domain::Retention::Sweep.new(
    manifest: container.manifest,
    file_stat: Textus::Ports::Storage::FileStat.new,
    clock: Textus::Ports::Clock.new,
  ).call(prefix: scope_prefix(scope), zone: scope_zone(scope))
  Textus::Maintenance::Retention::Apply.new(container: container, call: call).call(rows)
end