Class: QC::Queue
- Inherits:
-
Object
- Object
- QC::Queue
- Defined in:
- lib/queue_classic_plus/queue_classic/queue.rb
Instance Method Summary collapse
Instance Method Details
#enqueue_retry_in(seconds, method, remaining_retries, *args) ⇒ Object
4 5 6 7 8 9 10 11 |
# File 'lib/queue_classic_plus/queue_classic/queue.rb', line 4 def enqueue_retry_in(seconds, method, remaining_retries, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at, remaining_retries) VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds', $4)" conn_adapter.execute(s, name, method, JSON.dump(args), remaining_retries) end end |
#lock ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/queue_classic_plus/queue_classic/queue.rb', line 13 def lock QC.log_yield(:measure => 'queue.lock') do s = <<~SQL WITH selected_job AS ( SELECT id FROM queue_classic_jobs WHERE locked_at IS NULL AND q_name = $1 AND scheduled_at <= now() LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED ) UPDATE queue_classic_jobs SET locked_at = now(), locked_by = pg_backend_pid() FROM selected_job WHERE queue_classic_jobs.id = selected_job.id RETURNING * SQL if r = conn_adapter.execute(s, name) {}.tap do |job| job[:id] = r["id"] job[:q_name] = r["q_name"] job[:method] = r["method"] job[:args] = JSON.parse(r["args"]) job[:remaining_retries] = r["remaining_retries"]&.to_s if r["scheduled_at"] # ActiveSupport may cast time strings to Time job[:scheduled_at] = r["scheduled_at"].kind_of?(Time) ? r["scheduled_at"] : Time.parse(r["scheduled_at"]) ttl = Integer((Time.now - job[:scheduled_at]) * 1000) QC.measure("time-to-lock=#{ttl}ms source=#{name}") end end end end end |