Module: Wurk::Queues
- Defined in:
- lib/wurk/queues.rb
Overview
In-memory job store for ‘:fake` test mode (aliased to Sidekiq::Queues). Wurk::Client#raw_push routes payloads here instead of Redis when `Wurk::Testing.fake?`. Thread-safe so parallel test threads / a job that enqueues more jobs during `drain` don’t corrupt the store.
Spec: docs/target/sidekiq-free.md §24.2.
Class Method Summary collapse
-
.[](queue) ⇒ Object
Live array reference (not a copy), so the Sidekiq idiom ‘Sidekiq::Queues.clear` mutates the underlying store like stock.
- .clear_all ⇒ Object
-
.clear_class(klass) ⇒ Object
— drain helpers (lock released before the job runs, so a job that enqueues more work doesn’t deadlock on the same Mutex) —————.
- .clear_for(queue, klass) ⇒ Object
- .delete_for(jid, queue, _klass) ⇒ Object
-
.jobs ⇒ Object
Every enqueued payload across all queues, flattened.
- .jobs_by_class ⇒ Object (also: jobs_by_worker)
-
.jobs_by_queue ⇒ Object
Live hash of queue => [jobs], matching Sidekiq::Queues.jobs_by_queue.
- .push(queue, _klass, job) ⇒ Object
- .shift_any ⇒ Object
- .shift_class(klass) ⇒ Object
Class Method Details
.[](queue) ⇒ Object
Live array reference (not a copy), so the Sidekiq idiom ‘Sidekiq::Queues.clear` mutates the underlying store like stock.
17 18 19 |
# File 'lib/wurk/queues.rb', line 17 def [](queue) @lock.synchronize { @by_queue[queue.to_s] } end |
.clear_all ⇒ Object
49 50 51 |
# File 'lib/wurk/queues.rb', line 49 def clear_all @lock.synchronize { @by_queue.clear } end |
.clear_class(klass) ⇒ Object
— drain helpers (lock released before the job runs, so a job that enqueues more work doesn’t deadlock on the same Mutex) —————
56 57 58 59 |
# File 'lib/wurk/queues.rb', line 56 def clear_class(klass) klass = klass.to_s @lock.synchronize { @by_queue.each_value { |jobs| jobs.reject! { |j| j['class'].to_s == klass } } } end |
.clear_for(queue, klass) ⇒ Object
44 45 46 47 |
# File 'lib/wurk/queues.rb', line 44 def clear_for(queue, klass) klass = klass.to_s @lock.synchronize { @by_queue[queue.to_s].reject! { |j| j['class'].to_s == klass } } end |
.delete_for(jid, queue, _klass) ⇒ Object
40 41 42 |
# File 'lib/wurk/queues.rb', line 40 def delete_for(jid, queue, _klass) @lock.synchronize { @by_queue[queue.to_s].reject! { |j| j['jid'] == jid } } end |
.jobs ⇒ Object
Every enqueued payload across all queues, flattened.
36 37 38 |
# File 'lib/wurk/queues.rb', line 36 def jobs @lock.synchronize { @by_queue.values.flatten } end |
.jobs_by_class ⇒ Object Also known as: jobs_by_worker
30 31 32 |
# File 'lib/wurk/queues.rb', line 30 def jobs_by_class @lock.synchronize { @by_queue.values.flatten.group_by { |j| j['class'].to_s } } end |
.jobs_by_queue ⇒ Object
Live hash of queue => [jobs], matching Sidekiq::Queues.jobs_by_queue.
26 27 28 |
# File 'lib/wurk/queues.rb', line 26 def jobs_by_queue @lock.synchronize { @by_queue } end |
.push(queue, _klass, job) ⇒ Object
21 22 23 |
# File 'lib/wurk/queues.rb', line 21 def push(queue, _klass, job) @lock.synchronize { @by_queue[queue.to_s] << job } end |
.shift_any ⇒ Object
72 73 74 75 76 77 |
# File 'lib/wurk/queues.rb', line 72 def shift_any @lock.synchronize do @by_queue.each_value { |jobs| return jobs.shift unless jobs.empty? } nil end end |
.shift_class(klass) ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/wurk/queues.rb', line 61 def shift_class(klass) klass = klass.to_s @lock.synchronize do @by_queue.each_value do |jobs| idx = jobs.index { |j| j['class'].to_s == klass } return jobs.delete_at(idx) if idx end nil end end |