Class: Rubino::Jobs::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/runner.rb

Overview

Executes individual jobs by looking up handlers in the Registry.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Runner

Returns a new instance of Runner.



9
10
11
12
# File 'lib/rubino/jobs/runner.rb', line 9

def initialize(db: nil)
  @db = db || Rubino.database.db
  @queue = Queue.new(db: @db)
end

Instance Method Details

#run_job(job_id) ⇒ Object

Runs a specific job by ID



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rubino/jobs/runner.rb', line 15

def run_job(job_id)
  job = @db[:jobs].where(id: job_id).first
  return unless job

  handler = Registry.handler_for(job[:type])
  unless handler
    @queue.fail!(job_id, error: "No handler registered for: #{job[:type]}")
    return
  end

  payload = JSON.parse(job[:payload_json], symbolize_names: true)
  run_id = record_run_start(job_id)

  begin
    Rubino.event_bus.emit(Interaction::Events::JOB_STARTED, type: job[:type])
    handler.new.perform(payload)
    @queue.complete!(job_id)
    record_run_finish(run_id, status: "completed")
    Rubino.event_bus.emit(Interaction::Events::JOB_FINISHED, type: job[:type])
  rescue StandardError => e
    @queue.fail!(job_id, error: e.message)
    record_run_finish(run_id, status: "failed", error: e.message)
    Rubino.event_bus.emit(Interaction::Events::JOB_FAILED, type: job[:type], error: e.message)
  end
end

#run_pending(limit: 10) ⇒ Object

Runs all pending jobs up to limit



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rubino/jobs/runner.rb', line 42

def run_pending(limit: 10)
  worker_id = "runner-#{Process.pid}"
  processed = 0

  limit.times do
    job = @queue.dequeue(worker_id: worker_id)
    break unless job

    run_job(job[:id])
    processed += 1
  end

  processed
end