Class: Zizq::Query
Overview
Composable query builder for jobs in Zizq.
Provides a chainable, immutable API for filtering, iterating, updating, and deleting jobs. Each filter method returns a new ‘Query` instance, leaving the original unchanged.
‘Query` is `Enumerable`— it lazily paginates through results, so standard Ruby methods like `count`, `map`, `select`, `first`, etc. work out of the box.
Examples:
# Count ready jobs on a queue
Zizq::Query.new.by_queue("emails").by_status("ready").count
# Move all jobs from one queue to another
Zizq::Query.new.by_queue("old").update_all(queue: "new")
# Delete dead jobs matching a payload filter
Zizq::Query.new.by_status("dead").add_jq_filter(".user_id == 42").delete_all
# Iterate in batches
Zizq::Query.new.in_pages_of(100).each { |job| puts job.id }
# Move all jobs from one queue to another in batches.
Zizq::Query.new.by_queue("old").in_pages_of(100).update_all(queue: "new")
# Find jobs by class and arguments
Zizq::Query.new.by_job_class_and_args(SendEmailJob, 42, template: "welcome")
# Find jobs by class and arguments (subset)
Zizq::Query.new.by_job_class_and_args_subset(SendEmailJob, 42)
Constant Summary collapse
- MAX_PAGE_SIZE =
Maximum page size the server can handle.
2000
Instance Method Summary collapse
-
#add_id(id) ⇒ Object
Add a job ID to the existing ID filter.
-
#add_jq_filter(jq_filter) ⇒ Object
Add a jq payload filter, logically combines with any existing filter via “and”.
-
#add_queue(queue) ⇒ Object
Add a queue to the existing queue filter.
-
#add_status(status) ⇒ Object
Add a status to the existing status filter.
-
#add_type(type) ⇒ Object
Add a type to the existing type filter.
-
#any? ⇒ Boolean
Returns true if there are any matching jobs.
-
#by_id(id) ⇒ Object
Filter by job ID (replaces any existing ID filter).
-
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Object
Filter by job class and exact arguments.
-
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Object
Filter by job class and a subset of arguments.
-
#by_jq_filter(jq_filter) ⇒ Object
Replace the jq payload filter expression.
-
#by_queue(queue) ⇒ Object
Filter by queue name (replaces any existing queue filter).
-
#by_status(status) ⇒ Object
Filter by status (replaces any existing status filter).
-
#by_type(type) ⇒ Object
Filter by job type (replaces any existing type filter).
-
#count(*args, &block) ⇒ Object
Count matching jobs via the server-side count endpoint.
-
#delete_all ⇒ Object
Delete all matching jobs.
-
#delete_one ⇒ Object
Delete the first matching job.
-
#each(&block) ⇒ Object
Iterate over matching jobs, lazily paginating through results.
-
#each_page(&block) ⇒ Object
Iterate over pages of matching jobs.
-
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
-
#first ⇒ Object
Return the first matching job, or nil if none match.
-
#in_pages_of(page_size) ⇒ Object
Set the page size for paginated iteration.
-
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
constructor
Initialize the query with some initial parameters.
-
#last ⇒ Object
Return the last matching job, or nil if none match.
-
#limit(limit) ⇒ Object
Limit the total number of jobs returned.
-
#none? ⇒ Boolean
Returns true if there are no matching jobs.
-
#one? ⇒ Boolean
Returns true if there is exactly one matching job.
-
#order(order) ⇒ Object
Set the sort order for iteration.
-
#reverse_each(&block) ⇒ Object
Iterate over matching jobs in reverse order.
-
#reverse_order ⇒ Object
Reverse the sort order.
-
#take(n) ⇒ Object
Return the first ‘n` matching jobs.
-
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Object
Update all matching jobs with the given field values.
-
#update_one ⇒ Object
Update the first matching job.
Constructor Details
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
Initialize the query with some initial parameters.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/zizq/query.rb', line 62 def initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) @id = id @queue = queue @type = type @status = status @jq_filter = jq_filter @order = order @limit = limit @page_size = page_size end |
Instance Method Details
#add_id(id) ⇒ Object
Add a job ID to the existing ID filter.
104 105 106 |
# File 'lib/zizq/query.rb', line 104 def add_id(id) rebuild(id: Array(@id) + Array(id)) end |
#add_jq_filter(jq_filter) ⇒ Object
Add a jq payload filter, logically combines with any existing filter via “and”.
205 206 207 |
# File 'lib/zizq/query.rb', line 205 def add_jq_filter(jq_filter) rebuild(jq_filter: [@jq_filter, "(#{jq_filter})"].compact.join(" and ")) end |
#add_queue(queue) ⇒ Object
Add a queue to the existing queue filter.
120 121 122 |
# File 'lib/zizq/query.rb', line 120 def add_queue(queue) rebuild(queue: Array(@queue) + Array(queue)) end |
#add_status(status) ⇒ Object
Add a status to the existing status filter.
152 153 154 |
# File 'lib/zizq/query.rb', line 152 def add_status(status) rebuild(status: Array(@status) + Array(status)) end |
#add_type(type) ⇒ Object
Add a type to the existing type filter.
136 137 138 |
# File 'lib/zizq/query.rb', line 136 def add_type(type) rebuild(type: Array(@type) + Array(type)) end |
#any? ⇒ Boolean
Returns true if there are any matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
254 255 256 257 258 |
# File 'lib/zizq/query.rb', line 254 def any? return super if block_given? !first.nil? end |
#by_id(id) ⇒ Object
Filter by job ID (replaces any existing ID filter).
96 97 98 |
# File 'lib/zizq/query.rb', line 96 def by_id(id) rebuild(id:) end |
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Object
Filter by job class and exact arguments.
The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.
Sets the type filter to the class name and adds a jq payload filter for an exact match of the serialized arguments.
168 169 170 171 172 |
# File 'lib/zizq/query.rb', line 168 def by_job_class_and_args(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_filter(*args, **kwargs)) end |
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Object
Filter by job class and a subset of arguments.
Matches jobs whose positional args start with the given values and whose kwargs contain (at minimum) the given key/value pairs.
The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.
186 187 188 189 190 |
# File 'lib/zizq/query.rb', line 186 def by_job_class_and_args_subset(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_subset_filter(*args, **kwargs)) end |
#by_jq_filter(jq_filter) ⇒ Object
Replace the jq payload filter expression.
196 197 198 |
# File 'lib/zizq/query.rb', line 196 def by_jq_filter(jq_filter) rebuild(jq_filter:) end |
#by_queue(queue) ⇒ Object
Filter by queue name (replaces any existing queue filter).
112 113 114 |
# File 'lib/zizq/query.rb', line 112 def by_queue(queue) rebuild(queue:) end |
#by_status(status) ⇒ Object
Filter by status (replaces any existing status filter).
144 145 146 |
# File 'lib/zizq/query.rb', line 144 def by_status(status) rebuild(status:) end |
#by_type(type) ⇒ Object
Filter by job type (replaces any existing type filter).
128 129 130 |
# File 'lib/zizq/query.rb', line 128 def by_type(type) rebuild(type:) end |
#count(*args, &block) ⇒ Object
Count matching jobs via the server-side count endpoint.
Without a block or argument, uses ‘GET /jobs/count` for an efficient server-side count. When a limit is set, caps the result locally with `[total, limit].min`.
With a block or argument, falls back to Enumerable (iterates and counts matching jobs).
298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/zizq/query.rb', line 298 def count(*args, &block) return super if block || !args.empty? total = Zizq.client.count_jobs( id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, ) @limit ? [total, @limit].min : total end |
#delete_all ⇒ Object
Delete all matching jobs.
When ‘page_size` or `limit` is set, iterates page by page and issues a bulk delete per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk delete with the query filters.
When called in a bare query, this deletes all jobs from the server, which is useful in tests.
Returns the total number of deleted jobs.
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 |
# File 'lib/zizq/query.rb', line 523 def delete_all where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, } if @limit || @page_size remaining = @limit deleted = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining deleted += Zizq.client.delete_all_jobs( where: where.merge(id: ids_on_page), ) remaining -= ids_on_page.size if remaining end deleted else Zizq.client.delete_all_jobs(where:) end end |
#delete_one ⇒ Object
Delete the first matching job.
Returns 1 if a job was deleted, 0 if no jobs matched.
372 373 374 |
# File 'lib/zizq/query.rb', line 372 def delete_one limit(1).delete_all end |
#each(&block) ⇒ Object
Iterate over matching jobs, lazily paginating through results.
Respects ‘limit` if set. Without a block, returns an `Enumerator`.
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/zizq/query.rb', line 382 def each(&block) enumerator = enum_for(:each) if block_given? remaining = @limit each_page do |page| page.jobs.each do |job| if remaining break if remaining <= 0 end yield job remaining -= 1 if remaining end end end enumerator end |
#each_page(&block) ⇒ Object
Iterate over pages of matching jobs.
Each page is a ‘Resources::JobPage`. Without a block, returns an `Enumerator`.
If ‘limit` is set, terminates after the last page is reached that exceeds the limit, but does not truncate the page.
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/zizq/query.rb', line 414 def each_page(&block) enumerator = enum_for(:each_page) if block_given? page = Zizq.client.list_jobs( id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, limit: [@page_size, @limit, (@page_size || @limit) && MAX_PAGE_SIZE].compact.min, order: @order, ) remaining = @limit while page yield page if remaining remaining -= page.jobs.size break if remaining <= 0 end page = page.next_page end end enumerator end |
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
Optimised: fetches a single job to check.
243 244 245 |
# File 'lib/zizq/query.rb', line 243 def empty? first.nil? end |
#first ⇒ Object
Return the first matching job, or nil if none match.
Optimised: fetches a single job from the server (‘?limit=1`).
328 329 330 |
# File 'lib/zizq/query.rb', line 328 def first limit(1).each.first end |
#in_pages_of(page_size) ⇒ Object
Set the page size for paginated iteration.
When set, ‘each_page` fetches pages of this size, and `each` fetches jobs in pages of this size. Also used by `update_all` and `delete_all` to batch operations by page.
88 89 90 |
# File 'lib/zizq/query.rb', line 88 def in_pages_of(page_size) rebuild(page_size:) end |
#last ⇒ Object
Return the last matching job, or nil if none match.
Optimised: reverses the order and fetches a single job.
337 338 339 |
# File 'lib/zizq/query.rb', line 337 def last reverse_order.first end |
#limit(limit) ⇒ Object
Limit the total number of jobs returned.
This is a total limit, imposed across potentially multiple page fetches. This limit also applies to ‘update_all` and `delete_all` operations.
224 225 226 |
# File 'lib/zizq/query.rb', line 224 def limit(limit) rebuild(limit:) end |
#none? ⇒ Boolean
Returns true if there are no matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
267 268 269 270 271 |
# File 'lib/zizq/query.rb', line 267 def none? return super if block_given? first.nil? end |
#one? ⇒ Boolean
Returns true if there is exactly one matching job.
Without a block, optimised to fetch at most two jobs. With a block, falls back to Enumerable.
280 281 282 283 284 |
# File 'lib/zizq/query.rb', line 280 def one? return super if block_given? limit(2).to_a.size == 1 end |
#order(order) ⇒ Object
Set the sort order for iteration.
213 214 215 |
# File 'lib/zizq/query.rb', line 213 def order(order) rebuild(order:) end |
#reverse_each(&block) ⇒ Object
Iterate over matching jobs in reverse order.
Optimised: pushes the reverse ordering to the server instead of fetching all jobs into memory and reversing.
319 320 321 |
# File 'lib/zizq/query.rb', line 319 def reverse_each(&block) reverse_order.each(&block) end |
#reverse_order ⇒ Object
Reverse the sort order.
Returns a new query with the opposite order. If no order was set, defaults to descending (the server default is ascending).
234 235 236 |
# File 'lib/zizq/query.rb', line 234 def reverse_order rebuild(order: @order == :desc ? :asc : :desc) end |
#take(n) ⇒ Object
Return the first ‘n` matching jobs.
Optimised: sets the limit to ‘n` so the server only returns what’s needed.
348 349 350 |
# File 'lib/zizq/query.rb', line 348 def take(n) limit(n).to_a end |
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Object
Update all matching jobs with the given field values.
When ‘page_size` or `limit` is set, iterates page by page and issues a bulk update per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk update with the query parameters.
Returns the total number of updated jobs.
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 |
# File 'lib/zizq/query.rb', line 461 def update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, } apply = { queue:, priority:, ready_at:, retry_limit:, backoff:, retention:, } if @limit || @page_size remaining = @limit updated = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining updated += Zizq.client.update_all_jobs( where: where.merge(id: ids_on_page), apply:, ) remaining -= ids_on_page.size if remaining end updated else Zizq.client.update_all_jobs(where:, apply:) end end |
#update_one ⇒ Object
Update the first matching job.
Returns 1 if a job was updated, 0 if no jobs matched.
363 364 365 |
# File 'lib/zizq/query.rb', line 363 def update_one(...) limit(1).update_all(...) end |