Class: Wurk::Queue
- Inherits:
-
Object
- Object
- Wurk::Queue
- Includes:
- Enumerable, API::Fast::QueueExt
- Defined in:
- lib/wurk/queue.rb
Overview
Inspects / iterates one named queue against the canonical Sidekiq schema: LIST at ‘queue:<name>`, membership in the `queues` SET. Wire-compat is sacred — every Redis call below matches OSS exactly.
Pause/unpause is a Pro feature upstream; Wurk ships it free. Membership of the ‘paused` SET drives both `paused?` and the fetcher’s queue filter (see ‘Wurk::Fetcher::Reliable#queues_cmd`). In-flight jobs continue to completion — pausing only stops new fetches.
Spec: docs/target/sidekiq-free.md §19.2, sidekiq-pro.md §6.
Constant Summary collapse
- PAGE_SIZE =
Page size for LRANGE traversal. Matches upstream so dashboards observing Redis traffic see the same pattern.
50
Instance Attribute Summary collapse
-
#name ⇒ Object
(also: #id)
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.all ⇒ Array<Queue>
One per known queue, sorted by name.
Instance Method Summary collapse
- #as_json(_options = nil) ⇒ Object
-
#clear ⇒ Object
UNLINK the list + drop the queue from the ‘queues` set.
-
#each ⇒ Object
Paged LRANGE traversal.
-
#find_job(jid) ⇒ Object
O(n) scan.
-
#initialize(name = 'default') ⇒ Queue
constructor
A new instance of Queue.
-
#latency ⇒ Object
Seconds since the oldest job (tail of LIST) was enqueued.
-
#pause! ⇒ Object
Pause new fetches against this queue.
-
#paused? ⇒ Boolean
True iff this queue’s name is a member of the ‘paused` SET.
- #size ⇒ Object
-
#unpause! ⇒ Object
Resume fetches.
Methods included from API::Fast::QueueExt
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly) Also known as: id
Returns the value of attribute name.
23 24 25 |
# File 'lib/wurk/queue.rb', line 23 def name @name end |
Class Method Details
.all ⇒ Array<Queue>
Returns one per known queue, sorted by name.
27 28 29 30 |
# File 'lib/wurk/queue.rb', line 27 def self.all names = Wurk.redis { |conn| conn.call('SMEMBERS', Keys::QUEUES_SET) } names.sort.map { |n| new(n) } end |
Instance Method Details
#as_json(_options = nil) ⇒ Object
104 105 106 |
# File 'lib/wurk/queue.rb', line 104 def as_json( = nil) { name: @name } end |
#clear ⇒ Object
UNLINK the list + drop the queue from the ‘queues` set. Pipelined so a partial failure leaves at most one of the two ops applied. Method name is Sidekiq wire-compat — `clear?` would break the alias.
94 95 96 97 98 99 100 101 102 |
# File 'lib/wurk/queue.rb', line 94 def clear # rubocop:disable Naming/PredicateMethod Wurk.redis do |conn| conn.pipelined do |pipe| pipe.call('UNLINK', @rname) pipe.call('SREM', Keys::QUEUES_SET, @name) end end true end |
#each ⇒ Object
Paged LRANGE traversal. Yields JobRecord per payload. Continues paging until Redis returns < PAGE_SIZE rows.
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/wurk/queue.rb', line 72 def each page = 0 loop do start = page * PAGE_SIZE stop = start + PAGE_SIZE - 1 slice = Wurk.redis { |conn| conn.call('LRANGE', @rname, start, stop) } slice.each { |value| yield JobRecord.new(value, @name) } break if slice.size < PAGE_SIZE page += 1 end end |
#find_job(jid) ⇒ Object
O(n) scan. Returns nil if no job matches.
86 87 88 89 |
# File 'lib/wurk/queue.rb', line 86 def find_job(jid) each { |record| return record if record.jid == jid } nil end |
#latency ⇒ Object
Seconds since the oldest job (tail of LIST) was enqueued. 0.0 when empty.
42 43 44 45 46 47 48 49 |
# File 'lib/wurk/queue.rb', line 42 def latency payload = Wurk.redis { |conn| conn.call('LRANGE', @rname, -1, -1).first } return 0.0 if payload.nil? JobRecord.latency_from(Wurk.load_json(payload)['enqueued_at']) rescue ::JSON::ParserError 0.0 end |
#pause! ⇒ Object
Pause new fetches against this queue. Idempotent — ‘SADD` returns 0 when the name was already present. In-flight jobs are untouched.
59 60 61 62 |
# File 'lib/wurk/queue.rb', line 59 def pause! # rubocop:disable Naming/PredicateMethod Wurk.redis { |conn| conn.call('SADD', Keys::PAUSED_SET, @name) } true end |
#paused? ⇒ Boolean
True iff this queue’s name is a member of the ‘paused` SET. Wurk implements the Pro contract for free; fetchers consult the same set.
53 54 55 |
# File 'lib/wurk/queue.rb', line 53 def paused? Wurk.redis { |conn| conn.call('SISMEMBER', Keys::PAUSED_SET, @name) } == 1 end |
#size ⇒ Object
37 38 39 |
# File 'lib/wurk/queue.rb', line 37 def size Wurk.redis { |conn| conn.call('LLEN', @rname) } end |
#unpause! ⇒ Object
Resume fetches. Idempotent.
65 66 67 68 |
# File 'lib/wurk/queue.rb', line 65 def unpause! # rubocop:disable Naming/PredicateMethod Wurk.redis { |conn| conn.call('SREM', Keys::PAUSED_SET, @name) } true end |