Class: Zizq::Query

Inherits:
Object
  • Object
show all
Includes:
Enumerable, Enumerable[Zizq::Resources::Job]
Defined in:
lib/zizq/query.rb,
sig/generated/zizq/query.rbs

Overview

Composable query builder for jobs in Zizq.

Provides a chainable, immutable API for filtering, iterating, updating, and deleting jobs. Each filter method returns a new Query instance, leaving the original unchanged.

Query is Enumerable— it lazily paginates through results, so standard Ruby methods like count, map, select, first, etc. work out of the box.

Examples:

# Count ready jobs on a queue
Zizq::Query.new.by_queue("emails").by_status("ready").count

# Move all jobs from one queue to another
Zizq::Query.new.by_queue("old").update_all(queue: "new")

# Delete dead jobs matching a payload filter
Zizq::Query.new.by_status("dead").add_jq_filter(".user_id == 42").delete_all

# Iterate in batches
Zizq::Query.new.in_pages_of(100).each { |job| puts job.id }

# Move all jobs from one queue to another in batches.
Zizq::Query.new.by_queue("old").in_pages_of(100).update_all(queue: "new")

# Find jobs by class and arguments
Zizq::Query.new.by_job_class_and_args(SendEmailJob, 42, template: "welcome")

# Find jobs by class and arguments (subset)
Zizq::Query.new.by_job_class_and_args_subset(SendEmailJob, 42)

Constant Summary collapse

MAX_PAGE_SIZE =

Maximum page size the server can handle.

Returns:

  • (Integer)
2000

Instance Method Summary collapse

Constructor Details

#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, priority: nil, ready_at: nil, attempts: nil, order: nil, limit: nil, page_size: nil) ⇒ Query

Initialize the query with some initial parameters.

Parameters:

  • id: (String, Array[String], nil) (defaults to: nil)
  • queue: (String, Array[String], nil) (defaults to: nil)
  • type: (String, Array[String], nil) (defaults to: nil)
  • status: (String, Array[String], nil) (defaults to: nil)
  • jq_filter: (String, nil) (defaults to: nil)
  • priority: (Integer, Range[Integer?], nil) (defaults to: nil)
  • ready_at: (Zizq::to_f, Range[Zizq::to_f?], nil) (defaults to: nil)
  • attempts: (Integer, Range[Integer?], nil) (defaults to: nil)
  • order: (Zizq::sort_direction, nil) (defaults to: nil)
  • limit: (Integer, nil) (defaults to: nil)
  • page_size: (Integer, nil) (defaults to: nil)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/zizq/query.rb', line 65

def initialize(id: nil,
               queue: nil,
               type: nil,
               status: nil,
               jq_filter: nil,
               priority: nil,
               ready_at: nil,
               attempts: nil,
               order: nil,
               limit: nil,
               page_size: nil)
  @id = id
  @queue = queue
  @type = type
  @status = status
  @jq_filter = jq_filter
  @priority = priority
  @ready_at = ready_at
  @attempts = attempts
  @order = order
  @limit = limit
  @page_size = page_size
end

Instance Method Details

#add_id(id) ⇒ Query

Add a job ID to the existing ID filter.

Parameters:

  • id (String, Array[String])

Returns:



113
114
115
# File 'lib/zizq/query.rb', line 113

def add_id(id)
  rebuild(id: Array(@id) + Array(id))
end

#add_jq_filter(jq_filter) ⇒ Query

Add a jq payload filter, logically combines with any existing filter via "and".

Parameters:

  • jq_filter (String)

Returns:



270
271
272
# File 'lib/zizq/query.rb', line 270

def add_jq_filter(jq_filter)
  rebuild(jq_filter: [@jq_filter, "(#{jq_filter})"].compact.join(" and "))
end

#add_queue(queue) ⇒ Query

Add a queue to the existing queue filter.

Parameters:

  • queue (String, Array[String])

Returns:



129
130
131
# File 'lib/zizq/query.rb', line 129

def add_queue(queue)
  rebuild(queue: Array(@queue) + Array(queue))
end

#add_status(status) ⇒ Query

Add a status to the existing status filter.

Parameters:

  • status (String, Array[String])

Returns:



161
162
163
# File 'lib/zizq/query.rb', line 161

def add_status(status)
  rebuild(status: Array(@status) + Array(status))
end

#add_type(type) ⇒ Query

Add a type to the existing type filter.

Parameters:

  • type (String, Array[String])

Returns:



145
146
147
# File 'lib/zizq/query.rb', line 145

def add_type(type)
  rebuild(type: Array(@type) + Array(type))
end

#any?void

This method returns an undefined value.

Returns true if there are any matching jobs.

Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).



319
320
321
322
323
# File 'lib/zizq/query.rb', line 319

def any?
  return super if block_given?

  !first.nil?
end

#by_attempts(attempts) ⇒ Query

Filter by attempts range (replaces any existing attempts filter).

Accepts an Integer (exact match) or an inclusive Range. attempts is the number of times the job has failed. Exclusive ranges raise ArgumentError.

Examples:

.by_attempts(0)         # never failed
.by_attempts(1..)       # has failed at least once
.by_attempts(1..3)      # has failed 1, 2, or 3 times

Parameters:

  • attempts (Integer, Range[Integer?], nil)

Returns:



217
218
219
# File 'lib/zizq/query.rb', line 217

def by_attempts(attempts)
  rebuild(attempts:)
end

#by_id(id) ⇒ Query

Filter by job ID (replaces any existing ID filter).

Parameters:

  • id (String, Array[String], nil)

Returns:



105
106
107
# File 'lib/zizq/query.rb', line 105

def by_id(id)
  rebuild(id:)
end

#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Query

Filter by job class and exact arguments.

The job class must include Zizq::Job or for Active Job classes must extend Zizq::ActiveJobConfig.

Sets the type filter to the class name and adds a jq payload filter for an exact match of the serialized arguments.

Parameters:

  • job_class (Object)
  • args (Object)
  • kwargs (Object)

Returns:



233
234
235
236
237
# File 'lib/zizq/query.rb', line 233

def by_job_class_and_args(job_class, *args, **kwargs)
  validate_job_class!(job_class)
  name = job_class.name or raise ArgumentError, "anonymous classes are not supported"
  by_type(name).add_jq_filter(job_class.zizq_payload_filter(*args, **kwargs))
end

#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Query

Filter by job class and a subset of arguments.

Matches jobs whose positional args start with the given values and whose kwargs contain (at minimum) the given key/value pairs.

The job class must include Zizq::Job or for Active Job classes must extend Zizq::ActiveJobConfig.

Parameters:

Returns:



251
252
253
254
255
# File 'lib/zizq/query.rb', line 251

def by_job_class_and_args_subset(job_class, *args, **kwargs)
  validate_job_class!(job_class)
  name = job_class.name or raise ArgumentError, "anonymous classes are not supported"
  by_type(name).add_jq_filter(job_class.zizq_payload_subset_filter(*args, **kwargs))
end

#by_jq_filter(jq_filter) ⇒ Query

Replace the jq payload filter expression.

Parameters:

  • jq_filter (String, nil)

Returns:



261
262
263
# File 'lib/zizq/query.rb', line 261

def by_jq_filter(jq_filter)
  rebuild(jq_filter:)
end

#by_priority(priority) ⇒ Query

Filter by priority range (replaces any existing priority filter).

Accepts an Integer (exact match) or an inclusive Range. Lower numbers are higher priority. Exclusive ranges (e.g. 0...100) raise ArgumentError — the server only supports inclusive bounds.

Examples:

.by_priority(50)        # exactly priority 50
.by_priority(0..100)    # between 0 and 100 inclusive
.by_priority(100..)     # 100 or greater
.by_priority(..100)     # 100 or less

Parameters:

  • priority (Integer, Range[Integer?], nil)

Returns:



180
181
182
# File 'lib/zizq/query.rb', line 180

def by_priority(priority)
  rebuild(priority:)
end

#by_queue(queue) ⇒ Query

Filter by queue name (replaces any existing queue filter).

Parameters:

  • queue (String, Array[String], nil)

Returns:



121
122
123
# File 'lib/zizq/query.rb', line 121

def by_queue(queue)
  rebuild(queue:)
end

#by_ready_at(ready_at) ⇒ Query

Filter by ready_at range (replaces any existing ready_at filter).

Accepts a value (Time, Numeric, anything that responds to to_f) for an exact match, or an inclusive Range. Values are interpreted as fractional seconds on the Ruby side and converted to milliseconds for the server. Exclusive ranges raise ArgumentError.

Examples:

.by_ready_at(Time.now..)         # ready to run now or later
.by_ready_at(..Time.now)         # was ready to run by now
.by_ready_at(t1..t2)             # ready between t1 and t2

Parameters:

  • ready_at (Zizq::to_f, Range[Zizq::to_f?], nil)

Returns:



199
200
201
# File 'lib/zizq/query.rb', line 199

def by_ready_at(ready_at)
  rebuild(ready_at:)
end

#by_status(status) ⇒ Query

Filter by status (replaces any existing status filter).

Parameters:

  • status (String, Array[String])

Returns:



153
154
155
# File 'lib/zizq/query.rb', line 153

def by_status(status)
  rebuild(status:)
end

#by_type(type) ⇒ Query

Filter by job type (replaces any existing type filter).

Parameters:

  • type (String, Array[String], nil)

Returns:



137
138
139
# File 'lib/zizq/query.rb', line 137

def by_type(type)
  rebuild(type:)
end

#count(*args, &block) ⇒ void

This method returns an undefined value.

Count matching jobs via the server-side count endpoint.

Without a block or argument, uses GET /jobs/count for an efficient server-side count. When a limit is set, caps the result locally with [total, limit].min.

With a block or argument, falls back to Enumerable (iterates and counts matching jobs).

Parameters:

  • args (Object)


363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/zizq/query.rb', line 363

def count(*args, &block)
  return super if block || !args.empty?

  total = Zizq.client.count_jobs(
    id: @id,
    queue: @queue,
    type: @type,
    status: @status,
    filter: @jq_filter,
    priority: @priority,
    ready_at: @ready_at,
    attempts: @attempts,
  )

  @limit ? [total, @limit].min : total
end

#delete_allInteger

Delete all matching jobs.

When page_size or limit is set, iterates page by page and issues a bulk delete per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk delete with the query filters.

When called in a bare query, this deletes all jobs from the server, which is useful in tests.

Returns the total number of deleted jobs.

Returns:

  • (Integer)


597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
# File 'lib/zizq/query.rb', line 597

def delete_all
  where = {
    id: @id,
    queue: @queue,
    type: @type,
    status: @status,
    filter: @jq_filter,
    priority: @priority,
    ready_at: @ready_at,
    attempts: @attempts,
  }

  if @limit || @page_size
    remaining = @limit
    deleted = 0

    each_page do |page|
      if remaining
        break if remaining <= 0
      end

      ids_on_page = page.jobs.map(&:id)
      ids_on_page = ids_on_page.take(remaining) if remaining

      deleted += Zizq.client.delete_all_jobs(
        where: where.merge(id: ids_on_page),
      )

      remaining -= ids_on_page.size if remaining
    end

    deleted
  else
    Zizq.client.delete_all_jobs(where:)
  end
end

#delete_oneInteger

Delete the first matching job.

Returns 1 if a job was deleted, 0 if no jobs matched.

Returns:

  • (Integer)


440
441
442
# File 'lib/zizq/query.rb', line 440

def delete_one
  limit(1).delete_all
end

#each(&block) ⇒ void

This method returns an undefined value.

Iterate over matching jobs, lazily paginating through results.

Respects limit if set. Without a block, returns an Enumerator.



450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/zizq/query.rb', line 450

def each(&block)
  enumerator = enum_for(:each)

  if block_given?
    remaining = @limit

    each_page do |page|
      page.jobs.each do |job|
        if remaining
          break if remaining <= 0
        end

        yield job

        remaining -= 1 if remaining
      end
    end
  end

  enumerator
end

#each_page(&block) ⇒ void

This method returns an undefined value.

Iterate over pages of matching jobs.

Each page is a Resources::JobPage. Without a block, returns an Enumerator.

If limit is set, terminates after the last page is reached that exceeds the limit, but does not truncate the page.



482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/zizq/query.rb', line 482

def each_page(&block)
  enumerator = enum_for(:each_page)

  if block_given?
    page = Zizq.client.list_jobs(
      id: @id,
      queue: @queue,
      type: @type,
      status: @status,
      filter: @jq_filter,
      priority: @priority,
      ready_at: @ready_at,
      attempts: @attempts,
      limit: [@page_size, @limit, (@page_size || @limit) && MAX_PAGE_SIZE].compact.min,
      order: @order,
    )

    remaining = @limit

    while page
      yield page

      if remaining
        remaining -= page.jobs.size
        break if remaining <= 0
      end

      page = page.next_page
    end
  end

  enumerator
end

#empty?Boolean

Returns true if there are no matching jobs.

Optimised: fetches a single job to check.

Returns:

  • (Boolean)


308
309
310
# File 'lib/zizq/query.rb', line 308

def empty?
  first.nil?
end

#firstResources::Job?

Return the first matching job, or nil if none match.

Optimised: fetches a single job from the server (?limit=1).

Returns:



396
397
398
# File 'lib/zizq/query.rb', line 396

def first
  limit(1).each.first
end

#in_pages_of(page_size) ⇒ Query

Set the page size for paginated iteration.

When set, each_page fetches pages of this size, and each fetches jobs in pages of this size. Also used by update_all and delete_all to batch operations by page.

Parameters:

  • page_size (Integer, nil)

Returns:



97
98
99
# File 'lib/zizq/query.rb', line 97

def in_pages_of(page_size)
  rebuild(page_size:)
end

#lastResources::Job?

Return the last matching job, or nil if none match.

Optimised: reverses the order and fetches a single job.

Returns:



405
406
407
# File 'lib/zizq/query.rb', line 405

def last
  reverse_order.first
end

#limit(limit) ⇒ Query

Limit the total number of jobs returned.

This is a total limit, imposed across potentially multiple page fetches. This limit also applies to update_all and delete_all operations.

Parameters:

  • limit (Integer, nil)

Returns:



289
290
291
# File 'lib/zizq/query.rb', line 289

def limit(limit)
  rebuild(limit:)
end

#none?void

This method returns an undefined value.

Returns true if there are no matching jobs.

Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).



332
333
334
335
336
# File 'lib/zizq/query.rb', line 332

def none?
  return super if block_given?

  first.nil?
end

#one?void

This method returns an undefined value.

Returns true if there is exactly one matching job.

Without a block, optimised to fetch at most two jobs. With a block, falls back to Enumerable.



345
346
347
348
349
# File 'lib/zizq/query.rb', line 345

def one?
  return super if block_given?

  limit(2).to_a.size == 1
end

#order(order) ⇒ Query

Set the sort order for iteration.

Parameters:

  • order (Zizq::sort_direction, nil)

Returns:



278
279
280
# File 'lib/zizq/query.rb', line 278

def order(order)
  rebuild(order:)
end

#rebuild(id: @id, queue: @queue, type: @type, status: @status, jq_filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, order: @order, limit: @limit, page_size: @page_size) ⇒ Query

Build a new Query with the given overrides, preserving all other fields.

Parameters:

  • id: (Object) (defaults to: @id)
  • queue: (Object) (defaults to: @queue)
  • type: (Object) (defaults to: @type)
  • status: (Object) (defaults to: @status)
  • jq_filter: (Object) (defaults to: @jq_filter)
  • priority: (Object) (defaults to: @priority)
  • ready_at: (Object) (defaults to: @ready_at)
  • attempts: (Object) (defaults to: @attempts)
  • order: (Object) (defaults to: @order)
  • limit: (Object) (defaults to: @limit)
  • page_size: (Object) (defaults to: @page_size)

Returns:



639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/zizq/query.rb', line 639

def rebuild(id: @id,
            queue: @queue,
            type: @type,
            status: @status,
            jq_filter: @jq_filter,
            priority: @priority,
            ready_at: @ready_at,
            attempts: @attempts,
            order: @order,
            limit: @limit,
            page_size: @page_size)
  self.class.new(
    id:,
    queue:,
    type:,
    status:,
    jq_filter:,
    priority:,
    ready_at:,
    attempts:,
    limit:,
    order:,
    page_size:,
  )
end

#reverse_each(&block) ⇒ void

This method returns an undefined value.

Iterate over matching jobs in reverse order.

Optimised: pushes the reverse ordering to the server instead of fetching all jobs into memory and reversing.



387
388
389
# File 'lib/zizq/query.rb', line 387

def reverse_each(&block)
  reverse_order.each(&block)
end

#reverse_orderQuery

Reverse the sort order.

Returns a new query with the opposite order. If no order was set, defaults to descending (the server default is ascending).

Returns:



299
300
301
# File 'lib/zizq/query.rb', line 299

def reverse_order
  rebuild(order: @order == :desc ? :asc : :desc)
end

#take(n) ⇒ Array[Resources::Job]

Return the first n matching jobs.

Optimised: sets the limit to n so the server only returns what's needed.

Parameters:

  • n (Integer)

Returns:



416
417
418
# File 'lib/zizq/query.rb', line 416

def take(n)
  limit(n).to_a
end

#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Integer

Update all matching jobs with the given field values.

When page_size or limit is set, iterates page by page and issues a bulk update per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk update with the query parameters.

Returns the total number of updated jobs.

Parameters:

Returns:

  • (Integer)


532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
# File 'lib/zizq/query.rb', line 532

def update_all(queue: Zizq::UNCHANGED,
               priority: Zizq::UNCHANGED,
               ready_at: Zizq::UNCHANGED,
               retry_limit: Zizq::UNCHANGED,
               backoff: Zizq::UNCHANGED,
               retention: Zizq::UNCHANGED)
  where = {
    id: @id,
    queue: @queue,
    type: @type,
    status: @status,
    filter: @jq_filter,
    priority: @priority,
    ready_at: @ready_at,
    attempts: @attempts,
  }

  apply = {
    queue:,
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:,
  }

  if @limit || @page_size
    remaining = @limit
    updated = 0

    each_page do |page|
      if remaining
        break if remaining <= 0
      end

      ids_on_page = page.jobs.map(&:id)
      ids_on_page = ids_on_page.take(remaining) if remaining

      updated += Zizq.client.update_all_jobs(
        where: where.merge(id: ids_on_page),
        apply:,
      )

      remaining -= ids_on_page.size if remaining
    end

    updated
  else
    Zizq.client.update_all_jobs(where:, apply:)
  end
end

#update_oneInteger

Update the first matching job.

Returns 1 if a job was updated, 0 if no jobs matched.

Returns:

  • (Integer)


431
432
433
# File 'lib/zizq/query.rb', line 431

def update_one(...)
  limit(1).update_all(...)
end

#validate_job_class!(job_class) ⇒ void

This method returns an undefined value.

Parameters:

  • job_class (Object)


667
668
669
670
671
672
673
# File 'lib/zizq/query.rb', line 667

def validate_job_class!(job_class)
  unless job_class.is_a?(JobConfig)
    raise ArgumentError,
      "#{job_class} does not include Zizq::JobConfig " \
      "(include Zizq::Job or extend Zizq::ActiveJobConfig)"
  end
end