Module: Wurk::Queues

Defined in:
lib/wurk/queues.rb

Overview

In-memory job store for ‘:fake` test mode (aliased to Sidekiq::Queues). Wurk::Client#raw_push routes payloads here instead of Redis when `Wurk::Testing.fake?`. Thread-safe so parallel test threads / a job that enqueues more jobs during `drain` don’t corrupt the store.

Spec: docs/target/sidekiq-free.md §24.2.

Class Method Summary collapse

Class Method Details

.[](queue) ⇒ Object

Live array reference (not a copy), so the Sidekiq idiom ‘Sidekiq::Queues.clear` mutates the underlying store like stock.



17
18
19
# File 'lib/wurk/queues.rb', line 17

def [](queue)
  @lock.synchronize { @by_queue[queue.to_s] }
end

.clear_allObject



49
50
51
# File 'lib/wurk/queues.rb', line 49

def clear_all
  @lock.synchronize { @by_queue.clear }
end

.clear_class(klass) ⇒ Object

— drain helpers (lock released before the job runs, so a job that enqueues more work doesn’t deadlock on the same Mutex) —————



56
57
58
59
# File 'lib/wurk/queues.rb', line 56

def clear_class(klass)
  klass = klass.to_s
  @lock.synchronize { @by_queue.each_value { |jobs| jobs.reject! { |j| j['class'].to_s == klass } } }
end

.clear_for(queue, klass) ⇒ Object



44
45
46
47
# File 'lib/wurk/queues.rb', line 44

def clear_for(queue, klass)
  klass = klass.to_s
  @lock.synchronize { @by_queue[queue.to_s].reject! { |j| j['class'].to_s == klass } }
end

.delete_for(jid, queue, _klass) ⇒ Object



40
41
42
# File 'lib/wurk/queues.rb', line 40

def delete_for(jid, queue, _klass)
  @lock.synchronize { @by_queue[queue.to_s].reject! { |j| j['jid'] == jid } }
end

.jobsObject

Every enqueued payload across all queues, flattened.



36
37
38
# File 'lib/wurk/queues.rb', line 36

def jobs
  @lock.synchronize { @by_queue.values.flatten }
end

.jobs_by_classObject Also known as: jobs_by_worker



30
31
32
# File 'lib/wurk/queues.rb', line 30

def jobs_by_class
  @lock.synchronize { @by_queue.values.flatten.group_by { |j| j['class'].to_s } }
end

.jobs_by_queueObject

Live hash of queue => [jobs], matching Sidekiq::Queues.jobs_by_queue.



26
27
28
# File 'lib/wurk/queues.rb', line 26

def jobs_by_queue
  @lock.synchronize { @by_queue }
end

.push(queue, _klass, job) ⇒ Object



21
22
23
# File 'lib/wurk/queues.rb', line 21

def push(queue, _klass, job)
  @lock.synchronize { @by_queue[queue.to_s] << job }
end

.shift_anyObject



72
73
74
75
76
77
# File 'lib/wurk/queues.rb', line 72

def shift_any
  @lock.synchronize do
    @by_queue.each_value { |jobs| return jobs.shift unless jobs.empty? }
    nil
  end
end

.shift_class(klass) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/wurk/queues.rb', line 61

def shift_class(klass)
  klass = klass.to_s
  @lock.synchronize do
    @by_queue.each_value do |jobs|
      idx = jobs.index { |j| j['class'].to_s == klass }
      return jobs.delete_at(idx) if idx
    end
    nil
  end
end