Class: Zizq::Client
- Inherits:
-
Object
- Object
- Zizq::Client
- Defined in:
- lib/zizq/client.rb
Overview
Low-level HTTP wrapper for the Zizq job queue server API.
Supports both JSON and MessagePack serialization formats, determined at construction time.
HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.
Direct Known Subclasses
Defined Under Namespace
Classes: RawResponse
Constant Summary collapse
- CONTENT_TYPES =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/msgpack", json: "application/json" }.freeze
- STREAM_ACCEPT =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/vnd.zizq.msgpack-stream", json: "application/x-ndjson" }.freeze
Instance Attribute Summary collapse
-
#format ⇒ Object
readonly
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
-
#url ⇒ Object
readonly
The base URL of the Zizq server (e.g. “localhost:7890”).
Class Method Summary collapse
- .make_finalizer(io_queue, http_clients) ⇒ Object
-
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
-
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Instance Method Summary collapse
-
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
-
#cleanup_internal_clients ⇒ Object
: () -> void.
-
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
-
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
-
#delete_all_crons ⇒ Object
Delete every cron group on the server in a single call.
-
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
-
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
-
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
-
#delete_job(id) ⇒ Object
Delete a single job by ID.
-
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
-
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
-
#erase_all_data ⇒ Object
Wipe every cron group and every job on the server.
-
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
-
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
-
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
-
#get_job(id) ⇒ Object
Get a single job by ID.
-
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
-
#get_queues ⇒ Object
List all distinct queue names on the server.
-
#health ⇒ Object
Health check.
-
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
constructor
Initialize a new instance of the client with the given base URL and optional format options.
-
#list_cron_groups ⇒ Object
List all cron group names.
-
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
-
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
-
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
-
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
-
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object
(also: #nack)
Report a job failure (nack).
-
#report_success(id) ⇒ Object
(also: #ack)
Mark a job as successfully completed (ack).
-
#report_success_bulk(ids) ⇒ Object
(also: #ack_bulk)
Bulk-mark jobs as successfully completed (bulk ack).
-
#server_version ⇒ Object
Server version string.
-
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server.
-
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
-
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
-
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
-
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Constructor Details
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
Initialize a new instance of the client with the given base URL and optional format options.
‘read_timeout` and `stream_idle_timeout` are per-operation socket I/O timeouts (seconds). Each individual socket read/write is bounded by the timeout. The streaming `#take_jobs` endpoint uses `stream_idle_timeout` because the server sends heartbeats at periodic intervals which keeps the connection alive.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/zizq/client.rb', line 65 def initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) @url = url.chomp("/") @format = format = { protocol: Async::HTTP::Protocol::HTTP2, timeout: read_timeout, } #: Hash[Symbol, untyped] [:ssl_context] = ssl_context if ssl_context @endpoint = Async::HTTP::Endpoint.parse( @url, **, ) # Streaming take uses a dedicated HTTP/1.1 endpoint. The take # connection is long-lived and carries only one request, so HTTP/2's # multiplexing, stream IDs, and frame headers add overhead with no # benefit — there's nothing to multiplex against. Acks/enqueues run # on separate threads with their own HTTP/2 clients, so they're # unaffected either way. HTTP/1.1 gives the stream a plain TCP # socket with no framing tax and measurably better throughput. # # The stream endpoint uses `stream_idle_timeout` for its socket # timeout so server heartbeats (~3s) keep it alive while only # genuinely dead connections (no data for the full window) # trigger a reconnect. = .merge( protocol: Async::HTTP::Protocol::HTTP11, timeout: stream_idle_timeout, ) @stream_endpoint = Async::HTTP::Endpoint.parse( @url, **, ) @mutex = Mutex.new @io_thread = nil #: Thread? @io_queue = nil #: Thread::Queue? # Each thread gets its own Async::HTTP::Client bound to its own # reactor — one for regular request/response traffic (HTTP/2) and # a separate one lazily created on the first take_jobs call # (HTTP/1.1). Both kinds of clients are tracked in a single array # so `close` can shut them all down together. @http_clients = [] #: Array[Async::HTTP::Client] @http_key = :"zizq_http_#{object_id}" @stream_http_key = :"zizq_stream_http_#{object_id}" @content_type = CONTENT_TYPES.fetch(format) @stream_accept = STREAM_ACCEPT.fetch(format) end |
Instance Attribute Details
#format ⇒ Object (readonly)
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
48 49 50 |
# File 'lib/zizq/client.rb', line 48 def format @format end |
#url ⇒ Object (readonly)
The base URL of the Zizq server (e.g. “localhost:7890”)
44 45 46 |
# File 'lib/zizq/client.rb', line 44 def url @url end |
Class Method Details
.make_finalizer(io_queue, http_clients) ⇒ Object
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 |
# File 'lib/zizq/client.rb', line 1257 def self.make_finalizer(io_queue, http_clients) -> do io_queue&.close http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end http_clients.clear end end |
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.
Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 |
# File 'lib/zizq/client.rb', line 807 def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void io = StringIO.new("".b) chunks.each do |chunk| # Append new data at the end, then return to the read position. read_pos = io.pos io.seek(0, IO::SEEK_END) io.write(chunk.b) io.seek(read_pos) # Extract complete frames. while io.size - io.pos >= 4 len_bytes = io.read(4) #: String len = len_bytes.unpack1("N") #: Integer if len == 0 # heartbeat next end if io.size - io.pos < len # Incomplete frame — rewind past the length header and wait # for more data. io.seek(-4, IO::SEEK_CUR) break end yield MessagePack.unpack(io.read(len)) end # Compact: discard already-consumed bytes so the StringIO doesn't # grow without bound over the life of the stream. remaining = io.read io = StringIO.new(remaining || "".b) end end |
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Buffers chunks and splits on newline boundaries. The buffer only ever holds one partial line between extractions, so the ‘slice!` cost is trivial. Empty lines (heartbeats) are silently skipped.
785 786 787 788 789 790 791 792 793 794 795 796 |
# File 'lib/zizq/client.rb', line 785 def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void buffer = +"" chunks.each do |chunk| buffer << chunk while (idx = buffer.index("\n")) line = buffer.slice!(0, idx + 1) #: String line.strip! next if line.empty? yield JSON.parse(line) end end end |
Instance Method Details
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
Raises a ClientError (409 Conflict) if an entry with the same name already exists.
565 566 567 568 569 570 |
# File 'lib/zizq/client.rb', line 565 def add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name:, expression:, job:, timezone:, paused:) response = post("/crons/#{enc(group)}/entries", body) data = handle_response!(response, expected: 201) Resources::CronEntry.new(self, data) end |
#cleanup_internal_clients ⇒ Object
: () -> void
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/zizq/client.rb', line 135 def cleanup_internal_clients #: () -> void @mutex.synchronize do @http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end @http_clients.clear end end |
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/zizq/client.rb', line 124 def close #: () -> void if @io_thread&.alive? @mutex.synchronize do @io_queue&.close @io_thread&.join end end self.class.make_finalizer(@io_queue, @http_clients).call end |
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
Accepts the same filter arguments as ‘list_jobs` (minus pagination). Returns the count as an integer.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/zizq/client.rb', line 284 def count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) = { id:, status:, queue:, type:, filter: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = get("/jobs/count", params:) data = handle_response!(response, expected: 200) data.fetch("count") end |
#delete_all_crons ⇒ Object
Delete every cron group on the server in a single call.
Returns the number of cron groups removed.
Destructive. This deletes *every cron group on the server*. For granular deletes, use ‘delete_cron_group` with a specific name.
Requires a Pro license on the server.
536 537 538 539 540 |
# File 'lib/zizq/client.rb', line 536 def delete_all_crons response = delete("/crons") data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. An empty `where:` hash deletes all jobs.
Returns the number of deleted jobs.
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/zizq/client.rb', line 323 def delete_all_jobs(where: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = delete("/jobs", params:) data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
519 520 521 522 523 |
# File 'lib/zizq/client.rb', line 519 def delete_cron_group(name) response = delete("/crons/#{enc(name)}") handle_response!(response, expected: 204) nil end |
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
607 608 609 610 611 |
# File 'lib/zizq/client.rb', line 607 def delete_cron_group_entry(group, entry) response = delete("/crons/#{enc(group)}/entries/#{enc(entry)}") handle_response!(response, expected: 204) nil end |
#delete_job(id) ⇒ Object
Delete a single job by ID.
308 309 310 311 312 |
# File 'lib/zizq/client.rb', line 308 def delete_job(id) response = delete("/jobs/#{enc(id)}") handle_response!(response, expected: [200, 204]) nil end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue`
-
instead.
Returns a resource instance of the new job wrapping the API response.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/zizq/client.rb', line 170 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) body = { queue:, type:, payload: } #: Hash[Symbol, untyped] body[:priority] = priority if priority # ready_at is fractional seconds in Ruby; the server expects ms. body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at body[:retry_limit] = retry_limit if retry_limit body[:backoff] = backoff if backoff body[:retention] = retention if retention body[:unique_key] = unique_key if unique_key body[:unique_while] = unique_while.to_s if unique_while response = post("/jobs", body) data = handle_response!(response, expected: [200, 201]) Resources::Job.new(self, data) end |
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue_bulk`
-
instead.
Returns an array of resource instances wrapping the API response.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/zizq/client.rb', line 205 def enqueue_bulk(jobs:) body = { jobs: jobs.map do |job| wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped] wire[:priority] = job[:priority] if job[:priority] # ready_at is fractional seconds in Ruby; the server expects ms. wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at] wire[:retry_limit] = job[:retry_limit] if job[:retry_limit] wire[:backoff] = job[:backoff] if job[:backoff] wire[:retention] = job[:retention] if job[:retention] wire[:unique_key] = job[:unique_key] if job[:unique_key] wire[:unique_while] = job[:unique_while].to_s if job[:unique_while] wire end } response = post("/jobs/bulk", body) data = handle_response!(response, expected: [200, 201]) data["jobs"].map { |j| Resources::Job.new(self, j) } end |
#erase_all_data ⇒ Object
Wipe every cron group and every job on the server.
Equivalent to calling ‘delete_all_crons` followed by `delete_all_jobs` (no filter), but in a single request. Useful primarily as a setup/teardown step in tests where you want a known-empty server between scenarios.
Destructive. No filters, no escape hatch, no confirmation —the server-side operation simply returns once everything is gone.
Named ‘erase_all_data` rather than `reset` because `Zizq.reset!` already exists at the module level for client-side SDK state.
353 354 355 356 357 |
# File 'lib/zizq/client.rb', line 353 def erase_all_data response = raw_post("/reset") handle_response!(response, expected: 204) nil end |
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
479 480 481 482 483 |
# File 'lib/zizq/client.rb', line 479 def get_cron_group(name) response = get("/crons/#{enc(name)}") data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
547 548 549 550 551 |
# File 'lib/zizq/client.rb', line 547 def get_cron_group_entry(group, entry) response = get("/crons/#{enc(group)}/entries/#{enc(entry)}") data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
426 427 428 429 430 |
# File 'lib/zizq/client.rb', line 426 def get_error(id, attempt:) response = get("/jobs/#{enc(id)}/errors/#{enc(attempt.to_s)}") data = handle_response!(response, expected: 200) Resources::ErrorRecord.new(self, data) end |
#get_job(id) ⇒ Object
Get a single job by ID.
227 228 229 230 231 |
# File 'lib/zizq/client.rb', line 227 def get_job(id) #: (String) -> Resources::Job response = get("/jobs/#{enc(id)}") data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
The path should include any query parameters already (e.g. pagination links from the server’s ‘pages` object). This is intentionally public so that resource objects like Page can follow links without resorting to `.send`.
849 850 851 852 |
# File 'lib/zizq/client.rb', line 849 def get_path(path) #: (String) -> Hash[String, untyped] response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) } handle_response!(response, expected: 200) end |
#get_queues ⇒ Object
List all distinct queue names on the server.
460 461 462 463 464 |
# File 'lib/zizq/client.rb', line 460 def get_queues #: () -> Array[String] response = get("/queues") data = handle_response!(response, expected: 200) data["queues"] end |
#health ⇒ Object
Health check.
447 448 449 450 |
# File 'lib/zizq/client.rb', line 447 def health #: () -> Hash[String, untyped] response = get("/health") handle_response!(response, expected: 200) end |
#list_cron_groups ⇒ Object
List all cron group names.
469 470 471 472 473 |
# File 'lib/zizq/client.rb', line 469 def list_cron_groups response = get("/crons") data = handle_response!(response, expected: 200) data["crons"] end |
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
439 440 441 442 443 444 |
# File 'lib/zizq/client.rb', line 439 def list_errors(id, from: nil, order: nil, limit: nil) params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped] response = get("/jobs/#{enc(id)}/errors", params:) data = handle_response!(response, expected: 200) Resources::ErrorPage.new(self, data) end |
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
Multi-value filters (‘status`, `queue`, `type`, `id`) accept arrays —they are joined with commas as the server expects.
The ‘filter` parameter accepts a jq expression for filtering jobs by payload content (e.g. `.user_id == 42`).
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/zizq/client.rb', line 250 def list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) = { id:, status:, queue:, type:, filter:, from:, order:, limit: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == "" end response = get("/jobs", params:) data = handle_response!(response, expected: 200) Resources::JobPage.new(self, data) end |
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
Entries not present in the request are removed. Entries with unchanged expressions preserve their scheduling state.
494 495 496 497 498 499 500 501 502 |
# File 'lib/zizq/client.rb', line 494 def replace_cron_group(name, paused: nil, entries: []) body = { paused:, entries: entries.map { |entry| build_cron_entry(**entry) } }.compact response = put("/crons/#{enc(name)}", body) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
Preserves scheduling state if the expression is unchanged.
583 584 585 586 587 588 |
# File 'lib/zizq/client.rb', line 583 def replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name: entry, expression:, job:, timezone:, paused:) response = put("/crons/#{enc(group)}/entries/#{enc(entry)}", body) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object Also known as: nack
Report a job failure (nack).
Returns the updated job metadata.
If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
686 687 688 689 690 691 692 693 694 695 696 697 |
# File 'lib/zizq/client.rb', line 686 def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) body = { message: } #: Hash[Symbol, untyped] body[:error_type] = error_type if error_type body[:backtrace] = backtrace if backtrace # retry_at is fractional seconds in Ruby; the server expects ms. body[:retry_at] = (retry_at * 1000).to_i if retry_at body[:kill] = kill if kill response = post("/jobs/#{enc(id)}/failure", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#report_success(id) ⇒ Object Also known as: ack
Mark a job as successfully completed (ack).
If this method (or [‘#report_failure`]) is not called upon job completion, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
629 630 631 632 633 |
# File 'lib/zizq/client.rb', line 629 def report_success(id) #: (String) -> nil response = raw_post("/jobs/#{enc(id)}/success") handle_response!(response, expected: 204) nil end |
#report_success_bulk(ids) ⇒ Object Also known as: ack_bulk
Bulk-mark jobs as successfully completed (bulk ack).
See [‘#report_success`] for full details of how acknowledgemen works.
There are two ways in which the server can respond successfully:
-
204 - No Content (All jobs acknowledged)
-
422 - Unprocessible Entity (Some jobs were not found)
Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.
Other error response types will still raise.
652 653 654 655 656 |
# File 'lib/zizq/client.rb', line 652 def report_success_bulk(ids) response = post("/jobs/success", { ids: ids }) return nil if response.status == 422 handle_response!(response, expected: 204) end |
#server_version ⇒ Object
Server version string.
453 454 455 456 457 |
# File 'lib/zizq/client.rb', line 453 def server_version #: () -> String response = get("/version") data = handle_response!(response, expected: 200) data["version"] end |
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server. Yields parsed job hashes.
This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.
If the client does not acknowledge or fail jobs with ‘[#report_success`] or [`#report_failure`] the server will stop sending new jobs to the client as it hits its prefetch limit.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends periodic heartbeat messages to the client which are silently consumed.
Example:
client.take_jobs(prefetch: 5) do |job|
puts "Got job: #{job.inspect}"
client.ack(job.id) # mark the job completed
end
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 |
# File 'lib/zizq/client.rb', line 735 def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) raise ArgumentError, "take_jobs requires a block" unless block params = { prefetch: } #: Hash[Symbol, untyped] params[:queue] = queues.join(",") unless queues.empty? path = build_path("/jobs/take", params:) headers = { "accept" => @stream_accept } headers["worker-id"] = worker_id if worker_id Sync do response = stream_http.get(path, headers) begin raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200 on_connect&.call on_response&.call(response) # Wrap each parsed hash in a Resources::Job before yielding. wrapper = proc { |data| block.call(Resources::Job.new(self, data)) } # async-http returns `nil` for empty response bodies over HTTP/1.1 # (e.g. a 200 with content-length: 0 from the server closing the # stream immediately). Treat that as "no chunks" rather than # crashing in the parser. body = response.body || [] case @format when :json then self.class.parse_ndjson(body, &wrapper) when :msgpack then self.class.parse_msgpack_stream(body, &wrapper) end ensure response.close rescue nil end end rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, IO::TimeoutError, OpenSSL::SSL::SSLError => e raise ConnectionError, e. end |
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. Fields in the `apply:` argument use the same keys as `update_job`.
Terminal jobs (completed/dead) are silently skipped unless explicitly requested via ‘status:` in `where:`, which returns 422.
Returns the number of updated jobs.
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/zizq/client.rb', line 404 def update_all_jobs(where: {}, apply: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end body = validate_and_build_set(**apply) response = patch("/jobs", body, params:) data = handle_response!(response, expected: 200) data.fetch("patched") end |
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
509 510 511 512 513 |
# File 'lib/zizq/client.rb', line 509 def update_cron_group(name, paused: nil) response = patch("/crons/#{enc(name)}", { paused: }.compact) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
596 597 598 599 600 |
# File 'lib/zizq/client.rb', line 596 def update_cron_group_entry(group, entry, paused:) response = patch("/crons/#{enc(group)}/entries/#{enc(entry)}", { paused: }) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Fields not provided are left unchanged. Use ‘Zizq::RESET` to clear a nullable field back to the server default.
Raises ‘Zizq::NotFoundError` if the job does not exist. Raises `Zizq::ClientError` (422) if the job is in a terminal state.
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/zizq/client.rb', line 375 def update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = build_set_body( queue:, priority:, ready_at:, retry_limit:, backoff:, retention: ) response = patch("/jobs/#{enc(id)}", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |