Module: Textus::Jobs::Handlers
- Defined in:
- lib/textus/jobs/handlers.rb
Overview
Wires the closed allow-list of convergence job types to the existing convergence code. Authority is read from the job’s frozen ‘enqueued_by` and turned into the Call the handler runs under: produce self-elevates inside Produce::Engine regardless; destructive sweep runs AS the caller.
Class Method Summary collapse
- .call_for(job) ⇒ Object
-
.produce(job:, container:) ⇒ Object
produce: render derived / re-pull intake for a single key.
- .registry ⇒ Object
-
.scope_prefix(scope) ⇒ Object
A scope is ‘{ “prefix” => …, “zone” => … }` or nil (whole store).
- .scope_zone(scope) ⇒ Object
-
.sweep(job:, container:) ⇒ Object
sweep: compute retention rows for the scope, then apply destructively AS the job’s role (no self-elevation).
Class Method Details
.call_for(job) ⇒ Object
53 54 55 |
# File 'lib/textus/jobs/handlers.rb', line 53 def call_for(job) Textus::Call.build(role: job.enqueued_by || Textus::Role::AUTOMATION) end |
.produce(job:, container:) ⇒ Object
produce: render derived / re-pull intake for a single key. Engine self-elevates to the build actor internally; the passed call carries only correlation/dry_run plus the stamped role for audit. Engine#call isolates per-key produce errors into its result hash rather than raising, so surface them as :produce_failed events (the converge result hash used to carry them; the worker drops the return, so re-publish here).
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/textus/jobs/handlers.rb', line 26 def produce(job:, container:) call = call_for(job) result = Textus::Produce::Engine.converge(container: container, call: call, keys: [job.args["key"]]) return unless result.is_a?(Hash) Array(result[:failed]).each do |failure| container.events.publish( :produce_failed, ctx: Textus::Hooks::Context.for(container: container, call: call), keys: [failure["key"]], error: failure["error"] ) end end |
.registry ⇒ Object
10 11 12 13 14 15 16 17 18 |
# File 'lib/textus/jobs/handlers.rb', line 10 def registry reg = Textus::Domain::Jobs::Registry.new # produce is pure (self-elevates) — any caller may request a rematerialize. reg.register("materialize", handler: method(:produce)) reg.register("re-pull", handler: method(:produce)) # sweep is destructive — gate ad-hoc enqueue to the automation authority. reg.register("sweep", handler: method(:sweep), required_role: Textus::Role::AUTOMATION) reg end |
.scope_prefix(scope) ⇒ Object
A scope is ‘{ “prefix” => …, “zone” => … }` or nil (whole store).
58 |
# File 'lib/textus/jobs/handlers.rb', line 58 def scope_prefix(scope) = scope.is_a?(Hash) ? scope["prefix"] : nil |
.scope_zone(scope) ⇒ Object
59 |
# File 'lib/textus/jobs/handlers.rb', line 59 def scope_zone(scope) = scope.is_a?(Hash) ? scope["zone"] : nil |
.sweep(job:, container:) ⇒ Object
sweep: compute retention rows for the scope, then apply destructively AS the job’s role (no self-elevation).
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/textus/jobs/handlers.rb', line 42 def sweep(job:, container:) call = call_for(job) scope = job.args["scope"] rows = Textus::Domain::Retention::Sweep.new( manifest: container.manifest, file_stat: Textus::Ports::Storage::FileStat.new, clock: Textus::Ports::Clock.new, ).call(prefix: scope_prefix(scope), zone: scope_zone(scope)) Textus::Maintenance::Retention::Apply.new(container: container, call: call).call(rows) end |