Class: Zizq::Client
- Inherits:
-
Object
- Object
- Zizq::Client
- Defined in:
- lib/zizq/client.rb
Overview
Low-level HTTP wrapper for the Zizq job queue server API.
Supports both JSON and MessagePack serialization formats, determined at construction time.
HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.
Defined Under Namespace
Classes: RawResponse
Constant Summary collapse
- CONTENT_TYPES =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/msgpack", json: "application/json" }.freeze
- STREAM_ACCEPT =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/vnd.zizq.msgpack-stream", json: "application/x-ndjson" }.freeze
Instance Attribute Summary collapse
-
#format ⇒ Object
readonly
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
-
#url ⇒ Object
readonly
The base URL of the Zizq server (e.g. “localhost:7890”).
Class Method Summary collapse
- .make_finalizer(io_queue, http_clients) ⇒ Object
-
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
-
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Instance Method Summary collapse
-
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
-
#cleanup_internal_clients ⇒ Object
: () -> void.
-
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
-
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
-
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
-
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
-
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
-
#delete_job(id) ⇒ Object
Delete a single job by ID.
-
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
-
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
-
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
-
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
-
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
-
#get_job(id) ⇒ Object
Get a single job by ID.
-
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
-
#get_queues ⇒ Object
List all distinct queue names on the server.
-
#health ⇒ Object
Health check.
-
#initialize(url:, format: :msgpack, ssl_context: nil) ⇒ Client
constructor
Initialize a new instance of the client with the given base URL and optional format options.
-
#list_cron_groups ⇒ Object
List all cron group names.
-
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
-
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
-
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
-
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
-
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object
(also: #nack)
Report a job failure (nack).
-
#report_success(id) ⇒ Object
(also: #ack)
Mark a job as successfully completed (ack).
-
#report_success_bulk(ids) ⇒ Object
(also: #ack_bulk)
Bulk-mark jobs as successfully completed (bulk ack).
-
#server_version ⇒ Object
Server version string.
-
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server.
-
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
-
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
-
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
-
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Constructor Details
#initialize(url:, format: :msgpack, ssl_context: nil) ⇒ Client
Initialize a new instance of the client with the given base URL and optional format options.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/zizq/client.rb', line 57 def initialize(url:, format: :msgpack, ssl_context: nil) @url = url.chomp("/") @format = format = { protocol: Async::HTTP::Protocol::HTTP2 } #: Hash[Symbol, untyped] [:ssl_context] = ssl_context if ssl_context @endpoint = Async::HTTP::Endpoint.parse( @url, **, ) # Streaming take uses a dedicated HTTP/1.1 endpoint. The take # connection is long-lived and carries only one request, so HTTP/2's # multiplexing, stream IDs, and frame headers add overhead with no # benefit — there's nothing to multiplex against. Acks/enqueues run # on separate threads with their own HTTP/2 clients, so they're # unaffected either way. HTTP/1.1 gives the stream a plain TCP # socket with no framing tax and measurably better throughput. = .merge( protocol: Async::HTTP::Protocol::HTTP11, ) @stream_endpoint = Async::HTTP::Endpoint.parse( @url, **, ) @mutex = Mutex.new @io_thread = nil #: Thread? @io_queue = nil #: Thread::Queue? # Each thread gets its own Async::HTTP::Client bound to its own # reactor — one for regular request/response traffic (HTTP/2) and # a separate one lazily created on the first take_jobs call # (HTTP/1.1). Both kinds of clients are tracked in a single array # so `close` can shut them all down together. @http_clients = [] #: Array[Async::HTTP::Client] @http_key = :"zizq_http_#{object_id}" @stream_http_key = :"zizq_stream_http_#{object_id}" @content_type = CONTENT_TYPES.fetch(format) @stream_accept = STREAM_ACCEPT.fetch(format) end |
Instance Attribute Details
#format ⇒ Object (readonly)
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
48 49 50 |
# File 'lib/zizq/client.rb', line 48 def format @format end |
#url ⇒ Object (readonly)
The base URL of the Zizq server (e.g. “localhost:7890”)
44 45 46 |
# File 'lib/zizq/client.rb', line 44 def url @url end |
Class Method Details
.make_finalizer(io_queue, http_clients) ⇒ Object
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 |
# File 'lib/zizq/client.rb', line 1194 def self.make_finalizer(io_queue, http_clients) -> do io_queue&.close http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end http_clients.clear end end |
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.
Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 |
# File 'lib/zizq/client.rb', line 744 def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void io = StringIO.new("".b) chunks.each do |chunk| # Append new data at the end, then return to the read position. read_pos = io.pos io.seek(0, IO::SEEK_END) io.write(chunk.b) io.seek(read_pos) # Extract complete frames. while io.size - io.pos >= 4 len_bytes = io.read(4) #: String len = len_bytes.unpack1("N") #: Integer if len == 0 # heartbeat next end if io.size - io.pos < len # Incomplete frame — rewind past the length header and wait # for more data. io.seek(-4, IO::SEEK_CUR) break end yield MessagePack.unpack(io.read(len)) end # Compact: discard already-consumed bytes so the StringIO doesn't # grow without bound over the life of the stream. remaining = io.read io = StringIO.new(remaining || "".b) end end |
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Buffers chunks and splits on newline boundaries. The buffer only ever holds one partial line between extractions, so the ‘slice!` cost is trivial. Empty lines (heartbeats) are silently skipped.
722 723 724 725 726 727 728 729 730 731 732 733 |
# File 'lib/zizq/client.rb', line 722 def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void buffer = +"" chunks.each do |chunk| buffer << chunk while (idx = buffer.index("\n")) line = buffer.slice!(0, idx + 1) #: String line.strip! next if line.empty? yield JSON.parse(line) end end end |
Instance Method Details
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
Raises a ClientError (409 Conflict) if an entry with the same name already exists.
507 508 509 510 511 512 |
# File 'lib/zizq/client.rb', line 507 def add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name:, expression:, job:, timezone:, paused:) response = post("/crons/#{enc(group)}/entries", body) data = handle_response!(response, expected: 201) Resources::CronEntry.new(self, data) end |
#cleanup_internal_clients ⇒ Object
: () -> void
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/zizq/client.rb', line 114 def cleanup_internal_clients #: () -> void @mutex.synchronize do @http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end @http_clients.clear end end |
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/zizq/client.rb', line 103 def close #: () -> void if @io_thread&.alive? @mutex.synchronize do @io_queue&.close @io_thread&.join end end self.class.make_finalizer(@io_queue, @http_clients).call end |
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
Accepts the same filter arguments as ‘list_jobs` (minus pagination). Returns the count as an integer.
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/zizq/client.rb', line 263 def count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) = { id:, status:, queue:, type:, filter: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = get("/jobs/count", params:) data = handle_response!(response, expected: 200) data.fetch("count") end |
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. An empty `where:` hash deletes all jobs.
Returns the number of deleted jobs.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/zizq/client.rb', line 302 def delete_all_jobs(where: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = delete("/jobs", params:) data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
478 479 480 481 482 |
# File 'lib/zizq/client.rb', line 478 def delete_cron_group(name) response = delete("/crons/#{enc(name)}") handle_response!(response, expected: 204) nil end |
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
549 550 551 552 553 |
# File 'lib/zizq/client.rb', line 549 def delete_cron_group_entry(group, entry) response = delete("/crons/#{enc(group)}/entries/#{enc(entry)}") handle_response!(response, expected: 204) nil end |
#delete_job(id) ⇒ Object
Delete a single job by ID.
287 288 289 290 291 |
# File 'lib/zizq/client.rb', line 287 def delete_job(id) response = delete("/jobs/#{enc(id)}") handle_response!(response, expected: [200, 204]) nil end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue`
-
instead.
Returns a resource instance of the new job wrapping the API response.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/zizq/client.rb', line 149 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) body = { queue:, type:, payload: } #: Hash[Symbol, untyped] body[:priority] = priority if priority # ready_at is fractional seconds in Ruby; the server expects ms. body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at body[:retry_limit] = retry_limit if retry_limit body[:backoff] = backoff if backoff body[:retention] = retention if retention body[:unique_key] = unique_key if unique_key body[:unique_while] = unique_while.to_s if unique_while response = post("/jobs", body) data = handle_response!(response, expected: [200, 201]) Resources::Job.new(self, data) end |
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue_bulk`
-
instead.
Returns an array of resource instances wrapping the API response.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/zizq/client.rb', line 184 def enqueue_bulk(jobs:) body = { jobs: jobs.map do |job| wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped] wire[:priority] = job[:priority] if job[:priority] # ready_at is fractional seconds in Ruby; the server expects ms. wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at] wire[:retry_limit] = job[:retry_limit] if job[:retry_limit] wire[:backoff] = job[:backoff] if job[:backoff] wire[:retention] = job[:retention] if job[:retention] wire[:unique_key] = job[:unique_key] if job[:unique_key] wire[:unique_while] = job[:unique_while].to_s if job[:unique_while] wire end } response = post("/jobs/bulk", body) data = handle_response!(response, expected: [200, 201]) data["jobs"].map { |j| Resources::Job.new(self, j) } end |
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
438 439 440 441 442 |
# File 'lib/zizq/client.rb', line 438 def get_cron_group(name) response = get("/crons/#{enc(name)}") data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
489 490 491 492 493 |
# File 'lib/zizq/client.rb', line 489 def get_cron_group_entry(group, entry) response = get("/crons/#{enc(group)}/entries/#{enc(entry)}") data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
385 386 387 388 389 |
# File 'lib/zizq/client.rb', line 385 def get_error(id, attempt:) response = get("/jobs/#{enc(id)}/errors/#{enc(attempt.to_s)}") data = handle_response!(response, expected: 200) Resources::ErrorRecord.new(self, data) end |
#get_job(id) ⇒ Object
Get a single job by ID.
206 207 208 209 210 |
# File 'lib/zizq/client.rb', line 206 def get_job(id) #: (String) -> Resources::Job response = get("/jobs/#{enc(id)}") data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
The path should include any query parameters already (e.g. pagination links from the server’s ‘pages` object). This is intentionally public so that resource objects like Page can follow links without resorting to `.send`.
786 787 788 789 |
# File 'lib/zizq/client.rb', line 786 def get_path(path) #: (String) -> Hash[String, untyped] response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) } handle_response!(response, expected: 200) end |
#get_queues ⇒ Object
List all distinct queue names on the server.
419 420 421 422 423 |
# File 'lib/zizq/client.rb', line 419 def get_queues #: () -> Array[String] response = get("/queues") data = handle_response!(response, expected: 200) data["queues"] end |
#health ⇒ Object
Health check.
406 407 408 409 |
# File 'lib/zizq/client.rb', line 406 def health #: () -> Hash[String, untyped] response = get("/health") handle_response!(response, expected: 200) end |
#list_cron_groups ⇒ Object
List all cron group names.
428 429 430 431 432 |
# File 'lib/zizq/client.rb', line 428 def list_cron_groups response = get("/crons") data = handle_response!(response, expected: 200) data["crons"] end |
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
398 399 400 401 402 403 |
# File 'lib/zizq/client.rb', line 398 def list_errors(id, from: nil, order: nil, limit: nil) params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped] response = get("/jobs/#{enc(id)}/errors", params:) data = handle_response!(response, expected: 200) Resources::ErrorPage.new(self, data) end |
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
Multi-value filters (‘status`, `queue`, `type`, `id`) accept arrays —they are joined with commas as the server expects.
The ‘filter` parameter accepts a jq expression for filtering jobs by payload content (e.g. `.user_id == 42`).
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/zizq/client.rb', line 229 def list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) = { id:, status:, queue:, type:, filter:, from:, order:, limit: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == "" end response = get("/jobs", params:) data = handle_response!(response, expected: 200) Resources::JobPage.new(self, data) end |
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
Entries not present in the request are removed. Entries with unchanged expressions preserve their scheduling state.
453 454 455 456 457 458 459 460 461 |
# File 'lib/zizq/client.rb', line 453 def replace_cron_group(name, paused: nil, entries: []) body = { paused:, entries: entries.map { |entry| build_cron_entry(**entry) } }.compact response = put("/crons/#{enc(name)}", body) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
Preserves scheduling state if the expression is unchanged.
525 526 527 528 529 530 |
# File 'lib/zizq/client.rb', line 525 def replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name: entry, expression:, job:, timezone:, paused:) response = put("/crons/#{enc(group)}/entries/#{enc(entry)}", body) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object Also known as: nack
Report a job failure (nack).
Returns the updated job metadata.
If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
628 629 630 631 632 633 634 635 636 637 638 639 |
# File 'lib/zizq/client.rb', line 628 def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) body = { message: } #: Hash[Symbol, untyped] body[:error_type] = error_type if error_type body[:backtrace] = backtrace if backtrace # retry_at is fractional seconds in Ruby; the server expects ms. body[:retry_at] = (retry_at * 1000).to_i if retry_at body[:kill] = kill if kill response = post("/jobs/#{enc(id)}/failure", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#report_success(id) ⇒ Object Also known as: ack
Mark a job as successfully completed (ack).
If this method (or [‘#report_failure`]) is not called upon job completion, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
571 572 573 574 575 |
# File 'lib/zizq/client.rb', line 571 def report_success(id) #: (String) -> nil response = raw_post("/jobs/#{enc(id)}/success") handle_response!(response, expected: 204) nil end |
#report_success_bulk(ids) ⇒ Object Also known as: ack_bulk
Bulk-mark jobs as successfully completed (bulk ack).
See [‘#report_success`] for full details of how acknowledgemen works.
There are two ways in which the server can respond successfully:
-
204 - No Content (All jobs acknowledged)
-
422 - Unprocessible Entity (Some jobs were not found)
Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.
Other error response types will still raise.
594 595 596 597 598 |
# File 'lib/zizq/client.rb', line 594 def report_success_bulk(ids) response = post("/jobs/success", { ids: ids }) return nil if response.status == 422 handle_response!(response, expected: 204) end |
#server_version ⇒ Object
Server version string.
412 413 414 415 416 |
# File 'lib/zizq/client.rb', line 412 def server_version #: () -> String response = get("/version") data = handle_response!(response, expected: 200) data["version"] end |
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server. Yields parsed job hashes.
This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.
If the client does not acknowledge or fail jobs with ‘[#report_success`] or [`#report_failure`] the server will stop sending new jobs to the client as it hits its prefetch limit.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends periodic heartbeat messages to the client which are silently consumed.
Example:
client.take_jobs(prefetch: 5) do |job|
puts "Got job: #{job.inspect}"
client.ack(job.id) # mark the job completed
end
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 |
# File 'lib/zizq/client.rb', line 677 def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) raise ArgumentError, "take_jobs requires a block" unless block params = { prefetch: } #: Hash[Symbol, untyped] params[:queue] = queues.join(",") unless queues.empty? path = build_path("/jobs/take", params:) headers = { "accept" => @stream_accept } headers["worker-id"] = worker_id if worker_id Sync do response = stream_http.get(path, headers) begin raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200 on_connect&.call on_response&.call(response) # Wrap each parsed hash in a Resources::Job before yielding. wrapper = proc { |data| block.call(Resources::Job.new(self, data)) } # async-http returns `nil` for empty response bodies over HTTP/1.1 # (e.g. a 200 with content-length: 0 from the server closing the # stream immediately). Treat that as "no chunks" rather than # crashing in the parser. body = response.body || [] case @format when :json then self.class.parse_ndjson(body, &wrapper) when :msgpack then self.class.parse_msgpack_stream(body, &wrapper) end ensure response.close rescue nil end end rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, OpenSSL::SSL::SSLError => e raise ConnectionError, e. end |
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. Fields in the `apply:` argument use the same keys as `update_job`.
Terminal jobs (completed/dead) are silently skipped unless explicitly requested via ‘status:` in `where:`, which returns 422.
Returns the number of updated jobs.
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/zizq/client.rb', line 363 def update_all_jobs(where: {}, apply: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end body = validate_and_build_set(**apply) response = patch("/jobs", body, params:) data = handle_response!(response, expected: 200) data.fetch("patched") end |
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
468 469 470 471 472 |
# File 'lib/zizq/client.rb', line 468 def update_cron_group(name, paused: nil) response = patch("/crons/#{enc(name)}", { paused: }.compact) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
538 539 540 541 542 |
# File 'lib/zizq/client.rb', line 538 def update_cron_group_entry(group, entry, paused:) response = patch("/crons/#{enc(group)}/entries/#{enc(entry)}", { paused: }) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Fields not provided are left unchanged. Use ‘Zizq::RESET` to clear a nullable field back to the server default.
Raises ‘Zizq::NotFoundError` if the job does not exist. Raises `Zizq::ClientError` (422) if the job is in a terminal state.
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/zizq/client.rb', line 334 def update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = build_set_body( queue:, priority:, ready_at:, retry_limit:, backoff:, retention: ) response = patch("/jobs/#{enc(id)}", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |