Class: Rubino::Jobs::Runner
- Inherits:
-
Object
- Object
- Rubino::Jobs::Runner
- Defined in:
- lib/rubino/jobs/runner.rb
Overview
Executes individual jobs by looking up handlers in the Registry.
Instance Method Summary collapse
-
#initialize(db: nil) ⇒ Runner
constructor
A new instance of Runner.
-
#run_job(job_id) ⇒ Object
Runs a specific job by ID.
-
#run_pending(limit: 10) ⇒ Object
Runs all pending jobs up to limit.
Constructor Details
Instance Method Details
#run_job(job_id) ⇒ Object
Runs a specific job by ID
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rubino/jobs/runner.rb', line 15 def run_job(job_id) job = @db[:jobs].where(id: job_id).first return unless job handler = Registry.handler_for(job[:type]) unless handler @queue.fail!(job_id, error: "No handler registered for: #{job[:type]}") return end payload = JSON.parse(job[:payload_json], symbolize_names: true) run_id = record_run_start(job_id) begin Rubino.event_bus.emit(Interaction::Events::JOB_STARTED, type: job[:type]) handler.new.perform(payload) @queue.complete!(job_id) record_run_finish(run_id, status: "completed") Rubino.event_bus.emit(Interaction::Events::JOB_FINISHED, type: job[:type]) rescue StandardError => e @queue.fail!(job_id, error: e.) record_run_finish(run_id, status: "failed", error: e.) Rubino.event_bus.emit(Interaction::Events::JOB_FAILED, type: job[:type], error: e.) end end |
#run_pending(limit: 10) ⇒ Object
Runs all pending jobs up to limit
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rubino/jobs/runner.rb', line 42 def run_pending(limit: 10) worker_id = "runner-#{Process.pid}" processed = 0 limit.times do job = @queue.dequeue(worker_id: worker_id) break unless job run_job(job[:id]) processed += 1 end processed end |