Class: Wurk::Queue

Inherits:
Object
  • Object
show all
Includes:
Enumerable, API::Fast::QueueExt
Defined in:
lib/wurk/queue.rb

Overview

Inspects / iterates one named queue against the canonical Sidekiq schema: LIST at ‘queue:<name>`, membership in the `queues` SET. Wire-compat is sacred — every Redis call below matches OSS exactly.

Pause/unpause is a Pro feature upstream; Wurk ships it free. Membership of the ‘paused` SET drives both `paused?` and the fetcher’s queue filter (see ‘Wurk::Fetcher::Reliable#queues_cmd`). In-flight jobs continue to completion — pausing only stops new fetches.

Spec: docs/target/sidekiq-free.md §19.2, sidekiq-pro.md §6.

Constant Summary collapse

PAGE_SIZE =

Page size for LRANGE traversal. Matches upstream so dashboards observing Redis traffic see the same pattern.

50

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from API::Fast::QueueExt

#delete_by_class, #delete_job

Constructor Details

#initialize(name = 'default') ⇒ Queue

Returns a new instance of Queue.



32
33
34
35
# File 'lib/wurk/queue.rb', line 32

def initialize(name = 'default')
  @name = name.to_s
  @rname = Keys.queue(@name)
end

Instance Attribute Details

#nameObject (readonly) Also known as: id

Returns the value of attribute name.



23
24
25
# File 'lib/wurk/queue.rb', line 23

def name
  @name
end

Class Method Details

.allArray<Queue>

Returns one per known queue, sorted by name.

Returns:

  • (Array<Queue>)

    one per known queue, sorted by name.



27
28
29
30
# File 'lib/wurk/queue.rb', line 27

def self.all
  names = Wurk.redis { |conn| conn.call('SMEMBERS', Keys::QUEUES_SET) }
  names.sort.map { |n| new(n) }
end

Instance Method Details

#as_json(_options = nil) ⇒ Object



104
105
106
# File 'lib/wurk/queue.rb', line 104

def as_json(_options = nil)
  { name: @name }
end

#clearObject

UNLINK the list + drop the queue from the ‘queues` set. Pipelined so a partial failure leaves at most one of the two ops applied. Method name is Sidekiq wire-compat — `clear?` would break the alias.



94
95
96
97
98
99
100
101
102
# File 'lib/wurk/queue.rb', line 94

def clear # rubocop:disable Naming/PredicateMethod
  Wurk.redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('UNLINK', @rname)
      pipe.call('SREM', Keys::QUEUES_SET, @name)
    end
  end
  true
end

#eachObject

Paged LRANGE traversal. Yields JobRecord per payload. Continues paging until Redis returns < PAGE_SIZE rows.



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/wurk/queue.rb', line 72

def each
  page = 0
  loop do
    start  = page * PAGE_SIZE
    stop   = start + PAGE_SIZE - 1
    slice  = Wurk.redis { |conn| conn.call('LRANGE', @rname, start, stop) }
    slice.each { |value| yield JobRecord.new(value, @name) }
    break if slice.size < PAGE_SIZE

    page += 1
  end
end

#find_job(jid) ⇒ Object

O(n) scan. Returns nil if no job matches.



86
87
88
89
# File 'lib/wurk/queue.rb', line 86

def find_job(jid)
  each { |record| return record if record.jid == jid }
  nil
end

#latencyObject

Seconds since the oldest job (tail of LIST) was enqueued. 0.0 when empty.



42
43
44
45
46
47
48
49
# File 'lib/wurk/queue.rb', line 42

def latency
  payload = Wurk.redis { |conn| conn.call('LRANGE', @rname, -1, -1).first }
  return 0.0 if payload.nil?

  JobRecord.latency_from(Wurk.load_json(payload)['enqueued_at'])
rescue ::JSON::ParserError
  0.0
end

#pause!Object

Pause new fetches against this queue. Idempotent — ‘SADD` returns 0 when the name was already present. In-flight jobs are untouched.



59
60
61
62
# File 'lib/wurk/queue.rb', line 59

def pause! # rubocop:disable Naming/PredicateMethod
  Wurk.redis { |conn| conn.call('SADD', Keys::PAUSED_SET, @name) }
  true
end

#paused?Boolean

True iff this queue’s name is a member of the ‘paused` SET. Wurk implements the Pro contract for free; fetchers consult the same set.

Returns:

  • (Boolean)


53
54
55
# File 'lib/wurk/queue.rb', line 53

def paused?
  Wurk.redis { |conn| conn.call('SISMEMBER', Keys::PAUSED_SET, @name) } == 1
end

#sizeObject



37
38
39
# File 'lib/wurk/queue.rb', line 37

def size
  Wurk.redis { |conn| conn.call('LLEN', @rname) }
end

#unpause!Object

Resume fetches. Idempotent.



65
66
67
68
# File 'lib/wurk/queue.rb', line 65

def unpause! # rubocop:disable Naming/PredicateMethod
  Wurk.redis { |conn| conn.call('SREM', Keys::PAUSED_SET, @name) }
  true
end