Class: Rubino::Jobs::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/runner.rb

Overview

Executes individual jobs by looking up handlers in the Registry.

Constant Summary collapse

TERMINAL_STATUSES =

Statuses a job can never be re-run from β€” it already reached a terminal outcome. run_job refuses these (#346) so a double-call (two processes reaping the same orphan, a stale retry) can NEVER execute β€” and re-bill β€”an already-finished job a second time.

%w[completed failed dead].freeze

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Runner

Returns a new instance of Runner.



9
10
11
12
# File 'lib/rubino/jobs/runner.rb', line 9

def initialize(db: nil)
  @db = db || Rubino.database.db
  @queue = Queue.new(db: @db)
end

Instance Method Details

#run_job(job_id) ⇒ Object

Runs a specific job by ID



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/rubino/jobs/runner.rb', line 21

def run_job(job_id)
  job = @db[:jobs].where(id: job_id).first
  return unless job

  # Defence-in-depth re-check (#346): refuse a row that already reached a
  # terminal status. The CAS claim in Queue#reap_inline_orphans/#dequeue is
  # the primary guard against two processes double-running an orphan; this
  # second check means even a direct run_job on an already-completed row is
  # a harmless no-op rather than a second (billed) execution.
  return if TERMINAL_STATUSES.include?(job[:status])

  run_id = record_run_start(job_id)

  # Handler resolution and payload parsing live INSIDE the rescue so a
  # bad row (unknown type, or non-JSON payload_json written by an older
  # build / a corrupt write) is failure-isolated exactly like a handler
  # exception: it reaches fail! (terminal in inline mode) instead of
  # escaping. In inline mode run_job is driven directly by enqueue/
  # reap_inline_orphans on a live turn, so an escaping JSON::ParserError
  # would otherwise take down the whole interaction (#J1).
  begin
    handler = Registry.handler_for(job[:type])
    raise "No handler registered for: #{job[:type]}" unless handler

    payload = JSON.parse(job[:payload_json], symbolize_names: true)

    Rubino.event_bus.emit(Interaction::Events::JOB_STARTED, type: job[:type])
    handler.new.perform(payload)
    @queue.complete!(job_id)
    record_run_finish(run_id, status: "completed")
    Rubino.event_bus.emit(Interaction::Events::JOB_FINISHED, type: job[:type])
  rescue StandardError => e
    @queue.fail!(job_id, error: e.message)
    record_run_finish(run_id, status: "failed", error: e.message)
    Rubino.event_bus.emit(Interaction::Events::JOB_FAILED, type: job[:type], error: e.message)
  end
end

#run_pending(limit: 10) ⇒ Object

Runs all pending jobs up to limit



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rubino/jobs/runner.rb', line 60

def run_pending(limit: 10)
  worker_id = "runner-#{Process.pid}"
  processed = 0

  limit.times do
    job = @queue.dequeue(worker_id: worker_id)
    break unless job

    run_job(job[:id])
    processed += 1
  end

  processed
end