Class: Zizq::Query
- Inherits:
-
Object
- Object
- Zizq::Query
- Includes:
- Enumerable, Enumerable[Zizq::Resources::Job]
- Defined in:
- lib/zizq/query.rb,
sig/generated/zizq/query.rbs
Overview
Composable query builder for jobs in Zizq.
Provides a chainable, immutable API for filtering, iterating, updating,
and deleting jobs. Each filter method returns a new Query instance,
leaving the original unchanged.
Query is Enumerable— it lazily paginates through results, so
standard Ruby methods like count, map, select, first, etc.
work out of the box.
Examples:
# Count ready jobs on a queue
Zizq::Query.new.by_queue("emails").by_status("ready").count
# Move all jobs from one queue to another
Zizq::Query.new.by_queue("old").update_all(queue: "new")
# Delete dead jobs matching a payload filter
Zizq::Query.new.by_status("dead").add_jq_filter(".user_id == 42").delete_all
# Iterate in batches
Zizq::Query.new.in_pages_of(100).each { |job| puts job.id }
# Move all jobs from one queue to another in batches.
Zizq::Query.new.by_queue("old").in_pages_of(100).update_all(queue: "new")
# Find jobs by class and arguments
Zizq::Query.new.by_job_class_and_args(SendEmailJob, 42, template: "welcome")
# Find jobs by class and arguments (subset)
Zizq::Query.new.by_job_class_and_args_subset(SendEmailJob, 42)
Constant Summary collapse
- MAX_PAGE_SIZE =
Maximum page size the server can handle.
2000
Instance Method Summary collapse
-
#add_id(id) ⇒ Query
Add a job ID to the existing ID filter.
-
#add_jq_filter(jq_filter) ⇒ Query
Add a jq payload filter, logically combines with any existing filter via "and".
-
#add_queue(queue) ⇒ Query
Add a queue to the existing queue filter.
-
#add_status(status) ⇒ Query
Add a status to the existing status filter.
-
#add_type(type) ⇒ Query
Add a type to the existing type filter.
-
#any? ⇒ void
Returns true if there are any matching jobs.
-
#by_attempts(attempts) ⇒ Query
Filter by
attemptsrange (replaces any existingattemptsfilter). -
#by_id(id) ⇒ Query
Filter by job ID (replaces any existing ID filter).
-
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Query
Filter by job class and exact arguments.
-
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Query
Filter by job class and a subset of arguments.
-
#by_jq_filter(jq_filter) ⇒ Query
Replace the jq payload filter expression.
-
#by_priority(priority) ⇒ Query
Filter by priority range (replaces any existing priority filter).
-
#by_queue(queue) ⇒ Query
Filter by queue name (replaces any existing queue filter).
-
#by_ready_at(ready_at) ⇒ Query
Filter by
ready_atrange (replaces any existingready_atfilter). -
#by_status(status) ⇒ Query
Filter by status (replaces any existing status filter).
-
#by_type(type) ⇒ Query
Filter by job type (replaces any existing type filter).
-
#count(*args, &block) ⇒ void
Count matching jobs via the server-side count endpoint.
-
#delete_all ⇒ Integer
Delete all matching jobs.
-
#delete_one ⇒ Integer
Delete the first matching job.
-
#each(&block) ⇒ void
Iterate over matching jobs, lazily paginating through results.
-
#each_page(&block) ⇒ void
Iterate over pages of matching jobs.
-
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
-
#first ⇒ Resources::Job?
Return the first matching job, or nil if none match.
-
#in_pages_of(page_size) ⇒ Query
Set the page size for paginated iteration.
-
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, priority: nil, ready_at: nil, attempts: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
constructor
Initialize the query with some initial parameters.
-
#last ⇒ Resources::Job?
Return the last matching job, or nil if none match.
-
#limit(limit) ⇒ Query
Limit the total number of jobs returned.
-
#none? ⇒ void
Returns true if there are no matching jobs.
-
#one? ⇒ void
Returns true if there is exactly one matching job.
-
#order(order) ⇒ Query
Set the sort order for iteration.
-
#rebuild(id: @id, queue: @queue, type: @type, status: @status, jq_filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, order: @order, limit: @limit, page_size: @page_size) ⇒ Query
Build a new Query with the given overrides, preserving all other fields.
-
#reverse_each(&block) ⇒ void
Iterate over matching jobs in reverse order.
-
#reverse_order ⇒ Query
Reverse the sort order.
-
#take(n) ⇒ Array[Resources::Job]
Return the first
nmatching jobs. -
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Integer
Update all matching jobs with the given field values.
-
#update_one ⇒ Integer
Update the first matching job.
- #validate_job_class!(job_class) ⇒ void
Constructor Details
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, priority: nil, ready_at: nil, attempts: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
Initialize the query with some initial parameters.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/zizq/query.rb', line 65 def initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, priority: nil, ready_at: nil, attempts: nil, order: nil, limit: nil, page_size: nil) @id = id @queue = queue @type = type @status = status @jq_filter = jq_filter @priority = priority @ready_at = ready_at @attempts = attempts @order = order @limit = limit @page_size = page_size end |
Instance Method Details
#add_id(id) ⇒ Query
Add a job ID to the existing ID filter.
113 114 115 |
# File 'lib/zizq/query.rb', line 113 def add_id(id) rebuild(id: Array(@id) + Array(id)) end |
#add_jq_filter(jq_filter) ⇒ Query
Add a jq payload filter, logically combines with any existing filter via "and".
270 271 272 |
# File 'lib/zizq/query.rb', line 270 def add_jq_filter(jq_filter) rebuild(jq_filter: [@jq_filter, "(#{jq_filter})"].compact.join(" and ")) end |
#add_queue(queue) ⇒ Query
Add a queue to the existing queue filter.
129 130 131 |
# File 'lib/zizq/query.rb', line 129 def add_queue(queue) rebuild(queue: Array(@queue) + Array(queue)) end |
#add_status(status) ⇒ Query
Add a status to the existing status filter.
161 162 163 |
# File 'lib/zizq/query.rb', line 161 def add_status(status) rebuild(status: Array(@status) + Array(status)) end |
#add_type(type) ⇒ Query
Add a type to the existing type filter.
145 146 147 |
# File 'lib/zizq/query.rb', line 145 def add_type(type) rebuild(type: Array(@type) + Array(type)) end |
#any? ⇒ void
This method returns an undefined value.
Returns true if there are any matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
319 320 321 322 323 |
# File 'lib/zizq/query.rb', line 319 def any? return super if block_given? !first.nil? end |
#by_attempts(attempts) ⇒ Query
Filter by attempts range (replaces any existing attempts filter).
Accepts an Integer (exact match) or an inclusive Range. attempts is
the number of times the job has failed. Exclusive ranges raise
ArgumentError.
Examples:
.by_attempts(0) # never failed
.by_attempts(1..) # has failed at least once
.by_attempts(1..3) # has failed 1, 2, or 3 times
217 218 219 |
# File 'lib/zizq/query.rb', line 217 def by_attempts(attempts) rebuild(attempts:) end |
#by_id(id) ⇒ Query
Filter by job ID (replaces any existing ID filter).
105 106 107 |
# File 'lib/zizq/query.rb', line 105 def by_id(id) rebuild(id:) end |
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Query
Filter by job class and exact arguments.
The job class must include Zizq::Job or for Active Job classes must
extend Zizq::ActiveJobConfig.
Sets the type filter to the class name and adds a jq payload filter for an exact match of the serialized arguments.
233 234 235 236 237 |
# File 'lib/zizq/query.rb', line 233 def by_job_class_and_args(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_filter(*args, **kwargs)) end |
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Query
Filter by job class and a subset of arguments.
Matches jobs whose positional args start with the given values and whose kwargs contain (at minimum) the given key/value pairs.
The job class must include Zizq::Job or for Active Job classes must
extend Zizq::ActiveJobConfig.
251 252 253 254 255 |
# File 'lib/zizq/query.rb', line 251 def by_job_class_and_args_subset(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_subset_filter(*args, **kwargs)) end |
#by_jq_filter(jq_filter) ⇒ Query
Replace the jq payload filter expression.
261 262 263 |
# File 'lib/zizq/query.rb', line 261 def by_jq_filter(jq_filter) rebuild(jq_filter:) end |
#by_priority(priority) ⇒ Query
Filter by priority range (replaces any existing priority filter).
Accepts an Integer (exact match) or an inclusive Range. Lower numbers
are higher priority. Exclusive ranges (e.g. 0...100) raise
ArgumentError — the server only supports inclusive bounds.
Examples:
.by_priority(50) # exactly priority 50
.by_priority(0..100) # between 0 and 100 inclusive
.by_priority(100..) # 100 or greater
.by_priority(..100) # 100 or less
180 181 182 |
# File 'lib/zizq/query.rb', line 180 def by_priority(priority) rebuild(priority:) end |
#by_queue(queue) ⇒ Query
Filter by queue name (replaces any existing queue filter).
121 122 123 |
# File 'lib/zizq/query.rb', line 121 def by_queue(queue) rebuild(queue:) end |
#by_ready_at(ready_at) ⇒ Query
Filter by ready_at range (replaces any existing ready_at filter).
Accepts a value (Time, Numeric, anything that responds to to_f)
for an exact match, or an inclusive Range. Values are interpreted as
fractional seconds on the Ruby side and converted to milliseconds for
the server. Exclusive ranges raise ArgumentError.
Examples:
.by_ready_at(Time.now..) # ready to run now or later
.by_ready_at(..Time.now) # was ready to run by now
.by_ready_at(t1..t2) # ready between t1 and t2
199 200 201 |
# File 'lib/zizq/query.rb', line 199 def by_ready_at(ready_at) rebuild(ready_at:) end |
#by_status(status) ⇒ Query
Filter by status (replaces any existing status filter).
153 154 155 |
# File 'lib/zizq/query.rb', line 153 def by_status(status) rebuild(status:) end |
#by_type(type) ⇒ Query
Filter by job type (replaces any existing type filter).
137 138 139 |
# File 'lib/zizq/query.rb', line 137 def by_type(type) rebuild(type:) end |
#count(*args, &block) ⇒ void
This method returns an undefined value.
Count matching jobs via the server-side count endpoint.
Without a block or argument, uses GET /jobs/count for an efficient
server-side count. When a limit is set, caps the result locally with
[total, limit].min.
With a block or argument, falls back to Enumerable (iterates and counts matching jobs).
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/zizq/query.rb', line 363 def count(*args, &block) return super if block || !args.empty? total = Zizq.client.count_jobs( id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, ) @limit ? [total, @limit].min : total end |
#delete_all ⇒ Integer
Delete all matching jobs.
When page_size or limit is set, iterates page by page and
issues a bulk delete per page using the job IDs on that page. For safety
query parameters are included in the scope along with all IDs. Otherwise,
issues a single bulk delete with the query filters.
When called in a bare query, this deletes all jobs from the server, which is useful in tests.
Returns the total number of deleted jobs.
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 |
# File 'lib/zizq/query.rb', line 597 def delete_all where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, } if @limit || @page_size remaining = @limit deleted = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining deleted += Zizq.client.delete_all_jobs( where: where.merge(id: ids_on_page), ) remaining -= ids_on_page.size if remaining end deleted else Zizq.client.delete_all_jobs(where:) end end |
#delete_one ⇒ Integer
Delete the first matching job.
Returns 1 if a job was deleted, 0 if no jobs matched.
440 441 442 |
# File 'lib/zizq/query.rb', line 440 def delete_one limit(1).delete_all end |
#each(&block) ⇒ void
This method returns an undefined value.
Iterate over matching jobs, lazily paginating through results.
Respects limit if set. Without a block, returns an Enumerator.
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/zizq/query.rb', line 450 def each(&block) enumerator = enum_for(:each) if block_given? remaining = @limit each_page do |page| page.jobs.each do |job| if remaining break if remaining <= 0 end yield job remaining -= 1 if remaining end end end enumerator end |
#each_page(&block) ⇒ void
This method returns an undefined value.
Iterate over pages of matching jobs.
Each page is a Resources::JobPage. Without a block, returns an
Enumerator.
If limit is set, terminates after the last page is reached that exceeds
the limit, but does not truncate the page.
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/zizq/query.rb', line 482 def each_page(&block) enumerator = enum_for(:each_page) if block_given? page = Zizq.client.list_jobs( id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, limit: [@page_size, @limit, (@page_size || @limit) && MAX_PAGE_SIZE].compact.min, order: @order, ) remaining = @limit while page yield page if remaining remaining -= page.jobs.size break if remaining <= 0 end page = page.next_page end end enumerator end |
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
Optimised: fetches a single job to check.
308 309 310 |
# File 'lib/zizq/query.rb', line 308 def empty? first.nil? end |
#first ⇒ Resources::Job?
Return the first matching job, or nil if none match.
Optimised: fetches a single job from the server (?limit=1).
396 397 398 |
# File 'lib/zizq/query.rb', line 396 def first limit(1).each.first end |
#in_pages_of(page_size) ⇒ Query
Set the page size for paginated iteration.
When set, each_page fetches pages of this size, and each fetches jobs
in pages of this size. Also used by update_all and delete_all to
batch operations by page.
97 98 99 |
# File 'lib/zizq/query.rb', line 97 def in_pages_of(page_size) rebuild(page_size:) end |
#last ⇒ Resources::Job?
Return the last matching job, or nil if none match.
Optimised: reverses the order and fetches a single job.
405 406 407 |
# File 'lib/zizq/query.rb', line 405 def last reverse_order.first end |
#limit(limit) ⇒ Query
Limit the total number of jobs returned.
This is a total limit, imposed across potentially multiple page fetches.
This limit also applies to update_all and delete_all operations.
289 290 291 |
# File 'lib/zizq/query.rb', line 289 def limit(limit) rebuild(limit:) end |
#none? ⇒ void
This method returns an undefined value.
Returns true if there are no matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
332 333 334 335 336 |
# File 'lib/zizq/query.rb', line 332 def none? return super if block_given? first.nil? end |
#one? ⇒ void
This method returns an undefined value.
Returns true if there is exactly one matching job.
Without a block, optimised to fetch at most two jobs. With a block, falls back to Enumerable.
345 346 347 348 349 |
# File 'lib/zizq/query.rb', line 345 def one? return super if block_given? limit(2).to_a.size == 1 end |
#order(order) ⇒ Query
Set the sort order for iteration.
278 279 280 |
# File 'lib/zizq/query.rb', line 278 def order(order) rebuild(order:) end |
#rebuild(id: @id, queue: @queue, type: @type, status: @status, jq_filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, order: @order, limit: @limit, page_size: @page_size) ⇒ Query
Build a new Query with the given overrides, preserving all other fields.
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/zizq/query.rb', line 639 def rebuild(id: @id, queue: @queue, type: @type, status: @status, jq_filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, order: @order, limit: @limit, page_size: @page_size) self.class.new( id:, queue:, type:, status:, jq_filter:, priority:, ready_at:, attempts:, limit:, order:, page_size:, ) end |
#reverse_each(&block) ⇒ void
This method returns an undefined value.
Iterate over matching jobs in reverse order.
Optimised: pushes the reverse ordering to the server instead of fetching all jobs into memory and reversing.
387 388 389 |
# File 'lib/zizq/query.rb', line 387 def reverse_each(&block) reverse_order.each(&block) end |
#reverse_order ⇒ Query
Reverse the sort order.
Returns a new query with the opposite order. If no order was set, defaults to descending (the server default is ascending).
299 300 301 |
# File 'lib/zizq/query.rb', line 299 def reverse_order rebuild(order: @order == :desc ? :asc : :desc) end |
#take(n) ⇒ Array[Resources::Job]
Return the first n matching jobs.
Optimised: sets the limit to n so the server only returns what's
needed.
416 417 418 |
# File 'lib/zizq/query.rb', line 416 def take(n) limit(n).to_a end |
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Integer
Update all matching jobs with the given field values.
When page_size or limit is set, iterates page by page and
issues a bulk update per page using the job IDs on that page. For safety
query parameters are included in the scope along with all IDs. Otherwise,
issues a single bulk update with the query parameters.
Returns the total number of updated jobs.
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/zizq/query.rb', line 532 def update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, priority: @priority, ready_at: @ready_at, attempts: @attempts, } apply = { queue:, priority:, ready_at:, retry_limit:, backoff:, retention:, } if @limit || @page_size remaining = @limit updated = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining updated += Zizq.client.update_all_jobs( where: where.merge(id: ids_on_page), apply:, ) remaining -= ids_on_page.size if remaining end updated else Zizq.client.update_all_jobs(where:, apply:) end end |
#update_one ⇒ Integer
Update the first matching job.
Returns 1 if a job was updated, 0 if no jobs matched.
431 432 433 |
# File 'lib/zizq/query.rb', line 431 def update_one(...) limit(1).update_all(...) end |
#validate_job_class!(job_class) ⇒ void
This method returns an undefined value.
667 668 669 670 671 672 673 |
# File 'lib/zizq/query.rb', line 667 def validate_job_class!(job_class) unless job_class.is_a?(JobConfig) raise ArgumentError, "#{job_class} does not include Zizq::JobConfig " \ "(include Zizq::Job or extend Zizq::ActiveJobConfig)" end end |