Class: Zizq::Query

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/zizq/query.rb

Overview

Composable query builder for jobs in Zizq.

Provides a chainable, immutable API for filtering, iterating, updating, and deleting jobs. Each filter method returns a new ‘Query` instance, leaving the original unchanged.

‘Query` is `Enumerable`— it lazily paginates through results, so standard Ruby methods like `count`, `map`, `select`, `first`, etc. work out of the box.

Examples:

# Count ready jobs on a queue
Zizq::Query.new.by_queue("emails").by_status("ready").count

# Move all jobs from one queue to another
Zizq::Query.new.by_queue("old").update_all(queue: "new")

# Delete dead jobs matching a payload filter
Zizq::Query.new.by_status("dead").add_jq_filter(".user_id == 42").delete_all

# Iterate in batches
Zizq::Query.new.in_pages_of(100).each { |job| puts job.id }

# Move all jobs from one queue to another in batches.
Zizq::Query.new.by_queue("old").in_pages_of(100).update_all(queue: "new")

# Find jobs by class and arguments
Zizq::Query.new.by_job_class_and_args(SendEmailJob, 42, template: "welcome")

# Find jobs by class and arguments (subset)
Zizq::Query.new.by_job_class_and_args_subset(SendEmailJob, 42)

Constant Summary collapse

MAX_PAGE_SIZE =

Maximum page size the server can handle.

2000

Instance Method Summary collapse

Constructor Details

#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) ⇒ Query

Initialize the query with some initial parameters.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/zizq/query.rb', line 62

def initialize(id: nil,
               queue: nil,
               type: nil,
               status: nil,
               jq_filter: nil,
               order: nil,
               limit: nil,
               page_size: nil)
  @id = id
  @queue = queue
  @type = type
  @status = status
  @jq_filter = jq_filter
  @order = order
  @limit = limit
  @page_size = page_size
end

Instance Method Details

#add_id(id) ⇒ Object

Add a job ID to the existing ID filter.



104
105
106
# File 'lib/zizq/query.rb', line 104

def add_id(id)
  rebuild(id: Array(@id) + Array(id))
end

#add_jq_filter(jq_filter) ⇒ Object

Add a jq payload filter, logically combines with any existing filter via “and”.



205
206
207
# File 'lib/zizq/query.rb', line 205

def add_jq_filter(jq_filter)
  rebuild(jq_filter: [@jq_filter, "(#{jq_filter})"].compact.join(" and "))
end

#add_queue(queue) ⇒ Object

Add a queue to the existing queue filter.



120
121
122
# File 'lib/zizq/query.rb', line 120

def add_queue(queue)
  rebuild(queue: Array(@queue) + Array(queue))
end

#add_status(status) ⇒ Object

Add a status to the existing status filter.



152
153
154
# File 'lib/zizq/query.rb', line 152

def add_status(status)
  rebuild(status: Array(@status) + Array(status))
end

#add_type(type) ⇒ Object

Add a type to the existing type filter.



136
137
138
# File 'lib/zizq/query.rb', line 136

def add_type(type)
  rebuild(type: Array(@type) + Array(type))
end

#any?Boolean

Returns true if there are any matching jobs.

Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).

Returns:

  • (Boolean)


254
255
256
257
258
# File 'lib/zizq/query.rb', line 254

def any?
  return super if block_given?

  !first.nil?
end

#by_id(id) ⇒ Object

Filter by job ID (replaces any existing ID filter).



96
97
98
# File 'lib/zizq/query.rb', line 96

def by_id(id)
  rebuild(id:)
end

#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Object

Filter by job class and exact arguments.

The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.

Sets the type filter to the class name and adds a jq payload filter for an exact match of the serialized arguments.



168
169
170
171
172
# File 'lib/zizq/query.rb', line 168

def by_job_class_and_args(job_class, *args, **kwargs)
  validate_job_class!(job_class)
  name = job_class.name or raise ArgumentError, "anonymous classes are not supported"
  by_type(name).add_jq_filter(job_class.zizq_payload_filter(*args, **kwargs))
end

#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Object

Filter by job class and a subset of arguments.

Matches jobs whose positional args start with the given values and whose kwargs contain (at minimum) the given key/value pairs.

The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.



186
187
188
189
190
# File 'lib/zizq/query.rb', line 186

def by_job_class_and_args_subset(job_class, *args, **kwargs)
  validate_job_class!(job_class)
  name = job_class.name or raise ArgumentError, "anonymous classes are not supported"
  by_type(name).add_jq_filter(job_class.zizq_payload_subset_filter(*args, **kwargs))
end

#by_jq_filter(jq_filter) ⇒ Object

Replace the jq payload filter expression.



196
197
198
# File 'lib/zizq/query.rb', line 196

def by_jq_filter(jq_filter)
  rebuild(jq_filter:)
end

#by_queue(queue) ⇒ Object

Filter by queue name (replaces any existing queue filter).



112
113
114
# File 'lib/zizq/query.rb', line 112

def by_queue(queue)
  rebuild(queue:)
end

#by_status(status) ⇒ Object

Filter by status (replaces any existing status filter).



144
145
146
# File 'lib/zizq/query.rb', line 144

def by_status(status)
  rebuild(status:)
end

#by_type(type) ⇒ Object

Filter by job type (replaces any existing type filter).



128
129
130
# File 'lib/zizq/query.rb', line 128

def by_type(type)
  rebuild(type:)
end

#delete_allObject

Delete all matching jobs.

When ‘page_size` or `limit` is set, iterates page by page and issues a bulk delete per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk delete with the query filters.

When called in a bare query, this deletes all jobs from the server, which is useful in tests.

Returns the total number of deleted jobs.



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/zizq/query.rb', line 497

def delete_all
  where = {
    id: @id,
    queue: @queue,
    type: @type,
    status: @status,
    filter: @jq_filter,
  }

  if @limit || @page_size
    remaining = @limit
    deleted = 0

    each_page do |page|
      if remaining
        break if remaining <= 0
      end

      ids_on_page = page.jobs.map(&:id)
      ids_on_page = ids_on_page.take(remaining) if remaining

      deleted += Zizq.client.delete_all_jobs(
        where: where.merge(id: ids_on_page),
      )

      remaining -= ids_on_page.size if remaining
    end

    deleted
  else
    Zizq.client.delete_all_jobs(where:)
  end
end

#delete_oneObject

Delete the first matching job.

Returns 1 if a job was deleted, 0 if no jobs matched.



346
347
348
# File 'lib/zizq/query.rb', line 346

def delete_one
  limit(1).delete_all
end

#each(&block) ⇒ Object

Iterate over matching jobs, lazily paginating through results.

Respects ‘limit` if set. Without a block, returns an `Enumerator`.



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/zizq/query.rb', line 356

def each(&block)
  enumerator = enum_for(:each)

  if block_given?
    remaining = @limit

    each_page do |page|
      page.jobs.each do |job|
        if remaining
          break if remaining <= 0
        end

        yield job

        remaining -= 1 if remaining
      end
    end
  end

  enumerator
end

#each_page(&block) ⇒ Object

Iterate over pages of matching jobs.

Each page is a ‘Resources::JobPage`. Without a block, returns an `Enumerator`.

If ‘limit` is set, terminates after the last page is reached that exceeds the limit, but does not truncate the page.



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/zizq/query.rb', line 388

def each_page(&block)
  enumerator = enum_for(:each_page)

  if block_given?
    page = Zizq.client.list_jobs(
      id: @id,
      queue: @queue,
      type: @type,
      status: @status,
      filter: @jq_filter,
      limit: [@page_size, @limit, (@page_size || @limit) && MAX_PAGE_SIZE].compact.min,
      order: @order,
    )

    remaining = @limit

    while page
      yield page

      if remaining
        remaining -= page.jobs.size
        break if remaining <= 0
      end

      page = page.next_page
    end
  end

  enumerator
end

#empty?Boolean

Returns true if there are no matching jobs.

Optimised: fetches a single job to check.

Returns:

  • (Boolean)


243
244
245
# File 'lib/zizq/query.rb', line 243

def empty?
  first.nil?
end

#firstObject

Return the first matching job, or nil if none match.

Optimised: fetches a single job from the server (‘?limit=1`).



302
303
304
# File 'lib/zizq/query.rb', line 302

def first
  limit(1).each.first
end

#in_pages_of(page_size) ⇒ Object

Set the page size for paginated iteration.

When set, ‘each_page` fetches pages of this size, and `each` fetches jobs in pages of this size. Also used by `update_all` and `delete_all` to batch operations by page.



88
89
90
# File 'lib/zizq/query.rb', line 88

def in_pages_of(page_size)
  rebuild(page_size:)
end

#lastObject

Return the last matching job, or nil if none match.

Optimised: reverses the order and fetches a single job.



311
312
313
# File 'lib/zizq/query.rb', line 311

def last
  reverse_order.first
end

#limit(limit) ⇒ Object

Limit the total number of jobs returned.

This is a total limit, imposed across potentially multiple page fetches. This limit also applies to ‘update_all` and `delete_all` operations.



224
225
226
# File 'lib/zizq/query.rb', line 224

def limit(limit)
  rebuild(limit:)
end

#none?Boolean

Returns true if there are no matching jobs.

Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).

Returns:

  • (Boolean)


267
268
269
270
271
# File 'lib/zizq/query.rb', line 267

def none?
  return super if block_given?

  first.nil?
end

#one?Boolean

Returns true if there is exactly one matching job.

Without a block, optimised to fetch at most two jobs. With a block, falls back to Enumerable.

Returns:

  • (Boolean)


280
281
282
283
284
# File 'lib/zizq/query.rb', line 280

def one?
  return super if block_given?

  limit(2).to_a.size == 1
end

#order(order) ⇒ Object

Set the sort order for iteration.



213
214
215
# File 'lib/zizq/query.rb', line 213

def order(order)
  rebuild(order:)
end

#reverse_each(&block) ⇒ Object

Iterate over matching jobs in reverse order.

Optimised: pushes the reverse ordering to the server instead of fetching all jobs into memory and reversing.



293
294
295
# File 'lib/zizq/query.rb', line 293

def reverse_each(&block)
  reverse_order.each(&block)
end

#reverse_orderObject

Reverse the sort order.

Returns a new query with the opposite order. If no order was set, defaults to descending (the server default is ascending).



234
235
236
# File 'lib/zizq/query.rb', line 234

def reverse_order
  rebuild(order: @order == :desc ? :asc : :desc)
end

#take(n) ⇒ Object

Return the first ‘n` matching jobs.

Optimised: sets the limit to ‘n` so the server only returns what’s needed.



322
323
324
# File 'lib/zizq/query.rb', line 322

def take(n)
  limit(n).to_a
end

#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Object

Update all matching jobs with the given field values.

When ‘page_size` or `limit` is set, iterates page by page and issues a bulk update per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk update with the query parameters.

Returns the total number of updated jobs.



435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/zizq/query.rb', line 435

def update_all(queue: Zizq::UNCHANGED,
               priority: Zizq::UNCHANGED,
               ready_at: Zizq::UNCHANGED,
               retry_limit: Zizq::UNCHANGED,
               backoff: Zizq::UNCHANGED,
               retention: Zizq::UNCHANGED)
  where = {
    id: @id,
    queue: @queue,
    type: @type,
    status: @status,
    filter: @jq_filter,
  }

  apply = {
    queue:,
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:,
  }

  if @limit || @page_size
    remaining = @limit
    updated = 0

    each_page do |page|
      if remaining
        break if remaining <= 0
      end

      ids_on_page = page.jobs.map(&:id)
      ids_on_page = ids_on_page.take(remaining) if remaining

      updated += Zizq.client.update_all_jobs(
        where: where.merge(id: ids_on_page),
        apply:,
      )

      remaining -= ids_on_page.size if remaining
    end

    updated
  else
    Zizq.client.update_all_jobs(where:, apply:)
  end
end

#update_oneObject

Update the first matching job.

Returns 1 if a job was updated, 0 if no jobs matched.



337
338
339
# File 'lib/zizq/query.rb', line 337

def update_one(...)
  limit(1).update_all(...)
end