Class: Zizq::Query
Overview
Composable query builder for jobs in Zizq.
Provides a chainable, immutable API for filtering, iterating, updating, and deleting jobs. Each filter method returns a new ‘Query` instance, leaving the original unchanged.
‘Query` is `Enumerable`— it lazily paginates through results, so standard Ruby methods like `count`, `map`, `select`, `first`, etc. work out of the box.
Examples:
# Count ready jobs on a queue
Zizq::Query.new.by_queue("emails").by_status("ready").count
# Move all jobs from one queue to another
Zizq::Query.new.by_queue("old").update_all(queue: "new")
# Delete dead jobs matching a payload filter
Zizq::Query.new.by_status("dead").add_jq_filter(".user_id == 42").delete_all
# Iterate in batches
Zizq::Query.new.in_pages_of(100).each { |job| puts job.id }
# Move all jobs from one queue to another in batches.
Zizq::Query.new.by_queue("old").in_pages_of(100).update_all(queue: "new")
# Find jobs by class and arguments
Zizq::Query.new.by_job_class_and_args(SendEmailJob, 42, template: "welcome")
# Find jobs by class and arguments (subset)
Zizq::Query.new.by_job_class_and_args_subset(SendEmailJob, 42)
Constant Summary collapse
- MAX_PAGE_SIZE =
Maximum page size the server can handle.
2000
Instance Method Summary collapse
-
#add_id(id) ⇒ Object
Add a job ID to the existing ID filter.
-
#add_jq_filter(jq_filter) ⇒ Object
Add a jq payload filter, logically combines with any existing filter via “and”.
-
#add_queue(queue) ⇒ Object
Add a queue to the existing queue filter.
-
#add_status(status) ⇒ Object
Add a status to the existing status filter.
-
#add_type(type) ⇒ Object
Add a type to the existing type filter.
-
#any? ⇒ Boolean
Returns true if there are any matching jobs.
-
#by_id(id) ⇒ Object
Filter by job ID (replaces any existing ID filter).
-
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Object
Filter by job class and exact arguments.
-
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Object
Filter by job class and a subset of arguments.
-
#by_jq_filter(jq_filter) ⇒ Object
Replace the jq payload filter expression.
-
#by_queue(queue) ⇒ Object
Filter by queue name (replaces any existing queue filter).
-
#by_status(status) ⇒ Object
Filter by status (replaces any existing status filter).
-
#by_type(type) ⇒ Object
Filter by job type (replaces any existing type filter).
-
#delete_all ⇒ Object
Delete all matching jobs.
-
#delete_one ⇒ Object
Delete the first matching job.
-
#each(&block) ⇒ Object
Iterate over matching jobs, lazily paginating through results.
-
#each_page(&block) ⇒ Object
Iterate over pages of matching jobs.
-
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
-
#first ⇒ Object
Return the first matching job, or nil if none match.
-
#in_pages_of(page_size) ⇒ Object
Set the page size for paginated iteration.
-
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
constructor
Initialize the query with some initial parameters.
-
#last ⇒ Object
Return the last matching job, or nil if none match.
-
#limit(limit) ⇒ Object
Limit the total number of jobs returned.
-
#none? ⇒ Boolean
Returns true if there are no matching jobs.
-
#one? ⇒ Boolean
Returns true if there is exactly one matching job.
-
#order(order) ⇒ Object
Set the sort order for iteration.
-
#reverse_each(&block) ⇒ Object
Iterate over matching jobs in reverse order.
-
#reverse_order ⇒ Object
Reverse the sort order.
-
#take(n) ⇒ Object
Return the first ‘n` matching jobs.
-
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Object
Update all matching jobs with the given field values.
-
#update_one ⇒ Object
Update the first matching job.
Constructor Details
#initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) ⇒ Query
Initialize the query with some initial parameters.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/zizq/query.rb', line 62 def initialize(id: nil, queue: nil, type: nil, status: nil, jq_filter: nil, order: nil, limit: nil, page_size: nil) @id = id @queue = queue @type = type @status = status @jq_filter = jq_filter @order = order @limit = limit @page_size = page_size end |
Instance Method Details
#add_id(id) ⇒ Object
Add a job ID to the existing ID filter.
104 105 106 |
# File 'lib/zizq/query.rb', line 104 def add_id(id) rebuild(id: Array(@id) + Array(id)) end |
#add_jq_filter(jq_filter) ⇒ Object
Add a jq payload filter, logically combines with any existing filter via “and”.
205 206 207 |
# File 'lib/zizq/query.rb', line 205 def add_jq_filter(jq_filter) rebuild(jq_filter: [@jq_filter, "(#{jq_filter})"].compact.join(" and ")) end |
#add_queue(queue) ⇒ Object
Add a queue to the existing queue filter.
120 121 122 |
# File 'lib/zizq/query.rb', line 120 def add_queue(queue) rebuild(queue: Array(@queue) + Array(queue)) end |
#add_status(status) ⇒ Object
Add a status to the existing status filter.
152 153 154 |
# File 'lib/zizq/query.rb', line 152 def add_status(status) rebuild(status: Array(@status) + Array(status)) end |
#add_type(type) ⇒ Object
Add a type to the existing type filter.
136 137 138 |
# File 'lib/zizq/query.rb', line 136 def add_type(type) rebuild(type: Array(@type) + Array(type)) end |
#any? ⇒ Boolean
Returns true if there are any matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
254 255 256 257 258 |
# File 'lib/zizq/query.rb', line 254 def any? return super if block_given? !first.nil? end |
#by_id(id) ⇒ Object
Filter by job ID (replaces any existing ID filter).
96 97 98 |
# File 'lib/zizq/query.rb', line 96 def by_id(id) rebuild(id:) end |
#by_job_class_and_args(job_class, *args, **kwargs) ⇒ Object
Filter by job class and exact arguments.
The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.
Sets the type filter to the class name and adds a jq payload filter for an exact match of the serialized arguments.
168 169 170 171 172 |
# File 'lib/zizq/query.rb', line 168 def by_job_class_and_args(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_filter(*args, **kwargs)) end |
#by_job_class_and_args_subset(job_class, *args, **kwargs) ⇒ Object
Filter by job class and a subset of arguments.
Matches jobs whose positional args start with the given values and whose kwargs contain (at minimum) the given key/value pairs.
The job class must include ‘Zizq::Job` or for Active Job classes must extend `Zizq::ActiveJobConfig`.
186 187 188 189 190 |
# File 'lib/zizq/query.rb', line 186 def by_job_class_and_args_subset(job_class, *args, **kwargs) validate_job_class!(job_class) name = job_class.name or raise ArgumentError, "anonymous classes are not supported" by_type(name).add_jq_filter(job_class.zizq_payload_subset_filter(*args, **kwargs)) end |
#by_jq_filter(jq_filter) ⇒ Object
Replace the jq payload filter expression.
196 197 198 |
# File 'lib/zizq/query.rb', line 196 def by_jq_filter(jq_filter) rebuild(jq_filter:) end |
#by_queue(queue) ⇒ Object
Filter by queue name (replaces any existing queue filter).
112 113 114 |
# File 'lib/zizq/query.rb', line 112 def by_queue(queue) rebuild(queue:) end |
#by_status(status) ⇒ Object
Filter by status (replaces any existing status filter).
144 145 146 |
# File 'lib/zizq/query.rb', line 144 def by_status(status) rebuild(status:) end |
#by_type(type) ⇒ Object
Filter by job type (replaces any existing type filter).
128 129 130 |
# File 'lib/zizq/query.rb', line 128 def by_type(type) rebuild(type:) end |
#delete_all ⇒ Object
Delete all matching jobs.
When ‘page_size` or `limit` is set, iterates page by page and issues a bulk delete per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk delete with the query filters.
When called in a bare query, this deletes all jobs from the server, which is useful in tests.
Returns the total number of deleted jobs.
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 |
# File 'lib/zizq/query.rb', line 497 def delete_all where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, } if @limit || @page_size remaining = @limit deleted = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining deleted += Zizq.client.delete_all_jobs( where: where.merge(id: ids_on_page), ) remaining -= ids_on_page.size if remaining end deleted else Zizq.client.delete_all_jobs(where:) end end |
#delete_one ⇒ Object
Delete the first matching job.
Returns 1 if a job was deleted, 0 if no jobs matched.
346 347 348 |
# File 'lib/zizq/query.rb', line 346 def delete_one limit(1).delete_all end |
#each(&block) ⇒ Object
Iterate over matching jobs, lazily paginating through results.
Respects ‘limit` if set. Without a block, returns an `Enumerator`.
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/zizq/query.rb', line 356 def each(&block) enumerator = enum_for(:each) if block_given? remaining = @limit each_page do |page| page.jobs.each do |job| if remaining break if remaining <= 0 end yield job remaining -= 1 if remaining end end end enumerator end |
#each_page(&block) ⇒ Object
Iterate over pages of matching jobs.
Each page is a ‘Resources::JobPage`. Without a block, returns an `Enumerator`.
If ‘limit` is set, terminates after the last page is reached that exceeds the limit, but does not truncate the page.
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/zizq/query.rb', line 388 def each_page(&block) enumerator = enum_for(:each_page) if block_given? page = Zizq.client.list_jobs( id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, limit: [@page_size, @limit, (@page_size || @limit) && MAX_PAGE_SIZE].compact.min, order: @order, ) remaining = @limit while page yield page if remaining remaining -= page.jobs.size break if remaining <= 0 end page = page.next_page end end enumerator end |
#empty? ⇒ Boolean
Returns true if there are no matching jobs.
Optimised: fetches a single job to check.
243 244 245 |
# File 'lib/zizq/query.rb', line 243 def empty? first.nil? end |
#first ⇒ Object
Return the first matching job, or nil if none match.
Optimised: fetches a single job from the server (‘?limit=1`).
302 303 304 |
# File 'lib/zizq/query.rb', line 302 def first limit(1).each.first end |
#in_pages_of(page_size) ⇒ Object
Set the page size for paginated iteration.
When set, ‘each_page` fetches pages of this size, and `each` fetches jobs in pages of this size. Also used by `update_all` and `delete_all` to batch operations by page.
88 89 90 |
# File 'lib/zizq/query.rb', line 88 def in_pages_of(page_size) rebuild(page_size:) end |
#last ⇒ Object
Return the last matching job, or nil if none match.
Optimised: reverses the order and fetches a single job.
311 312 313 |
# File 'lib/zizq/query.rb', line 311 def last reverse_order.first end |
#limit(limit) ⇒ Object
Limit the total number of jobs returned.
This is a total limit, imposed across potentially multiple page fetches. This limit also applies to ‘update_all` and `delete_all` operations.
224 225 226 |
# File 'lib/zizq/query.rb', line 224 def limit(limit) rebuild(limit:) end |
#none? ⇒ Boolean
Returns true if there are no matching jobs.
Without a block, optimised to fetch a single job. With a block, falls back to Enumerable (tests each job against the block).
267 268 269 270 271 |
# File 'lib/zizq/query.rb', line 267 def none? return super if block_given? first.nil? end |
#one? ⇒ Boolean
Returns true if there is exactly one matching job.
Without a block, optimised to fetch at most two jobs. With a block, falls back to Enumerable.
280 281 282 283 284 |
# File 'lib/zizq/query.rb', line 280 def one? return super if block_given? limit(2).to_a.size == 1 end |
#order(order) ⇒ Object
Set the sort order for iteration.
213 214 215 |
# File 'lib/zizq/query.rb', line 213 def order(order) rebuild(order:) end |
#reverse_each(&block) ⇒ Object
Iterate over matching jobs in reverse order.
Optimised: pushes the reverse ordering to the server instead of fetching all jobs into memory and reversing.
293 294 295 |
# File 'lib/zizq/query.rb', line 293 def reverse_each(&block) reverse_order.each(&block) end |
#reverse_order ⇒ Object
Reverse the sort order.
Returns a new query with the opposite order. If no order was set, defaults to descending (the server default is ascending).
234 235 236 |
# File 'lib/zizq/query.rb', line 234 def reverse_order rebuild(order: @order == :desc ? :asc : :desc) end |
#take(n) ⇒ Object
Return the first ‘n` matching jobs.
Optimised: sets the limit to ‘n` so the server only returns what’s needed.
322 323 324 |
# File 'lib/zizq/query.rb', line 322 def take(n) limit(n).to_a end |
#update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) ⇒ Object
Update all matching jobs with the given field values.
When ‘page_size` or `limit` is set, iterates page by page and issues a bulk update per page using the job IDs on that page. For safety query parameters are included in the scope along with all IDs. Otherwise, issues a single bulk update with the query parameters.
Returns the total number of updated jobs.
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 |
# File 'lib/zizq/query.rb', line 435 def update_all(queue: Zizq::UNCHANGED, priority: Zizq::UNCHANGED, ready_at: Zizq::UNCHANGED, retry_limit: Zizq::UNCHANGED, backoff: Zizq::UNCHANGED, retention: Zizq::UNCHANGED) where = { id: @id, queue: @queue, type: @type, status: @status, filter: @jq_filter, } apply = { queue:, priority:, ready_at:, retry_limit:, backoff:, retention:, } if @limit || @page_size remaining = @limit updated = 0 each_page do |page| if remaining break if remaining <= 0 end ids_on_page = page.jobs.map(&:id) ids_on_page = ids_on_page.take(remaining) if remaining updated += Zizq.client.update_all_jobs( where: where.merge(id: ids_on_page), apply:, ) remaining -= ids_on_page.size if remaining end updated else Zizq.client.update_all_jobs(where:, apply:) end end |
#update_one ⇒ Object
Update the first matching job.
Returns 1 if a job was updated, 0 if no jobs matched.
337 338 339 |
# File 'lib/zizq/query.rb', line 337 def update_one(...) limit(1).update_all(...) end |