Class: Zizq::Client
- Inherits:
-
Object
- Object
- Zizq::Client
- Defined in:
- lib/zizq/client.rb
Overview
Low-level HTTP wrapper for the Zizq job queue server API.
Supports both JSON and MessagePack serialization formats, determined at construction time.
HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.
Defined Under Namespace
Classes: RawResponse
Constant Summary collapse
- CONTENT_TYPES =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/msgpack", json: "application/json" }.freeze
- STREAM_ACCEPT =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/vnd.zizq.msgpack-stream", json: "application/x-ndjson" }.freeze
Instance Attribute Summary collapse
-
#format ⇒ Object
readonly
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
-
#url ⇒ Object
readonly
The base URL of the Zizq server (e.g. “localhost:7890”).
Class Method Summary collapse
- .make_finalizer(io_queue, http_clients) ⇒ Object
-
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
-
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Instance Method Summary collapse
-
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
-
#cleanup_internal_clients ⇒ Object
: () -> void.
-
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
-
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
-
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
-
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
-
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
-
#delete_job(id) ⇒ Object
Delete a single job by ID.
-
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
-
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
-
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
-
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
-
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
-
#get_job(id) ⇒ Object
Get a single job by ID.
-
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
-
#get_queues ⇒ Object
List all distinct queue names on the server.
-
#health ⇒ Object
Health check.
-
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
constructor
Initialize a new instance of the client with the given base URL and optional format options.
-
#list_cron_groups ⇒ Object
List all cron group names.
-
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
-
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
-
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
-
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
-
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object
(also: #nack)
Report a job failure (nack).
-
#report_success(id) ⇒ Object
(also: #ack)
Mark a job as successfully completed (ack).
-
#report_success_bulk(ids) ⇒ Object
(also: #ack_bulk)
Bulk-mark jobs as successfully completed (bulk ack).
-
#server_version ⇒ Object
Server version string.
-
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server.
-
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
-
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
-
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
-
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Constructor Details
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
Initialize a new instance of the client with the given base URL and optional format options.
‘read_timeout` and `stream_idle_timeout` are per-operation socket I/O timeouts (seconds). Each individual socket read/write is bounded by the timeout. The streaming `#take_jobs` endpoint uses `stream_idle_timeout` because the server sends heartbeats at periodic intervals which keeps the connection alive.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/zizq/client.rb', line 65 def initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) @url = url.chomp("/") @format = format = { protocol: Async::HTTP::Protocol::HTTP2, timeout: read_timeout, } #: Hash[Symbol, untyped] [:ssl_context] = ssl_context if ssl_context @endpoint = Async::HTTP::Endpoint.parse( @url, **, ) # Streaming take uses a dedicated HTTP/1.1 endpoint. The take # connection is long-lived and carries only one request, so HTTP/2's # multiplexing, stream IDs, and frame headers add overhead with no # benefit — there's nothing to multiplex against. Acks/enqueues run # on separate threads with their own HTTP/2 clients, so they're # unaffected either way. HTTP/1.1 gives the stream a plain TCP # socket with no framing tax and measurably better throughput. # # The stream endpoint uses `stream_idle_timeout` for its socket # timeout so server heartbeats (~3s) keep it alive while only # genuinely dead connections (no data for the full window) # trigger a reconnect. = .merge( protocol: Async::HTTP::Protocol::HTTP11, timeout: stream_idle_timeout, ) @stream_endpoint = Async::HTTP::Endpoint.parse( @url, **, ) @mutex = Mutex.new @io_thread = nil #: Thread? @io_queue = nil #: Thread::Queue? # Each thread gets its own Async::HTTP::Client bound to its own # reactor — one for regular request/response traffic (HTTP/2) and # a separate one lazily created on the first take_jobs call # (HTTP/1.1). Both kinds of clients are tracked in a single array # so `close` can shut them all down together. @http_clients = [] #: Array[Async::HTTP::Client] @http_key = :"zizq_http_#{object_id}" @stream_http_key = :"zizq_stream_http_#{object_id}" @content_type = CONTENT_TYPES.fetch(format) @stream_accept = STREAM_ACCEPT.fetch(format) end |
Instance Attribute Details
#format ⇒ Object (readonly)
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
48 49 50 |
# File 'lib/zizq/client.rb', line 48 def format @format end |
#url ⇒ Object (readonly)
The base URL of the Zizq server (e.g. “localhost:7890”)
44 45 46 |
# File 'lib/zizq/client.rb', line 44 def url @url end |
Class Method Details
.make_finalizer(io_queue, http_clients) ⇒ Object
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 |
# File 'lib/zizq/client.rb', line 1220 def self.make_finalizer(io_queue, http_clients) -> do io_queue&.close http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end http_clients.clear end end |
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.
Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 |
# File 'lib/zizq/client.rb', line 770 def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void io = StringIO.new("".b) chunks.each do |chunk| # Append new data at the end, then return to the read position. read_pos = io.pos io.seek(0, IO::SEEK_END) io.write(chunk.b) io.seek(read_pos) # Extract complete frames. while io.size - io.pos >= 4 len_bytes = io.read(4) #: String len = len_bytes.unpack1("N") #: Integer if len == 0 # heartbeat next end if io.size - io.pos < len # Incomplete frame — rewind past the length header and wait # for more data. io.seek(-4, IO::SEEK_CUR) break end yield MessagePack.unpack(io.read(len)) end # Compact: discard already-consumed bytes so the StringIO doesn't # grow without bound over the life of the stream. remaining = io.read io = StringIO.new(remaining || "".b) end end |
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Buffers chunks and splits on newline boundaries. The buffer only ever holds one partial line between extractions, so the ‘slice!` cost is trivial. Empty lines (heartbeats) are silently skipped.
748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'lib/zizq/client.rb', line 748 def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void buffer = +"" chunks.each do |chunk| buffer << chunk while (idx = buffer.index("\n")) line = buffer.slice!(0, idx + 1) #: String line.strip! next if line.empty? yield JSON.parse(line) end end end |
Instance Method Details
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Add a single entry to a cron group (creates the group if needed).
Raises a ClientError (409 Conflict) if an entry with the same name already exists.
528 529 530 531 532 533 |
# File 'lib/zizq/client.rb', line 528 def add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name:, expression:, job:, timezone:, paused:) response = post("/crons/#{enc(group)}/entries", body) data = handle_response!(response, expected: 201) Resources::CronEntry.new(self, data) end |
#cleanup_internal_clients ⇒ Object
: () -> void
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/zizq/client.rb', line 135 def cleanup_internal_clients #: () -> void @mutex.synchronize do @http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end @http_clients.clear end end |
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/zizq/client.rb', line 124 def close #: () -> void if @io_thread&.alive? @mutex.synchronize do @io_queue&.close @io_thread&.join end end self.class.make_finalizer(@io_queue, @http_clients).call end |
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) ⇒ Object
Count jobs matching the given filters.
Accepts the same filter arguments as ‘list_jobs` (minus pagination). Returns the count as an integer.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/zizq/client.rb', line 284 def count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil) = { id:, status:, queue:, type:, filter: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = get("/jobs/count", params:) data = handle_response!(response, expected: 200) data.fetch("count") end |
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. An empty `where:` hash deletes all jobs.
Returns the number of deleted jobs.
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/zizq/client.rb', line 323 def delete_all_jobs(where: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = delete("/jobs", params:) data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_cron_group(name) ⇒ Object
Delete a cron group and all its entries.
499 500 501 502 503 |
# File 'lib/zizq/client.rb', line 499 def delete_cron_group(name) response = delete("/crons/#{enc(name)}") handle_response!(response, expected: 204) nil end |
#delete_cron_group_entry(group, entry) ⇒ Object
Delete a single cron entry.
570 571 572 573 574 |
# File 'lib/zizq/client.rb', line 570 def delete_cron_group_entry(group, entry) response = delete("/crons/#{enc(group)}/entries/#{enc(entry)}") handle_response!(response, expected: 204) nil end |
#delete_job(id) ⇒ Object
Delete a single job by ID.
308 309 310 311 312 |
# File 'lib/zizq/client.rb', line 308 def delete_job(id) response = delete("/jobs/#{enc(id)}") handle_response!(response, expected: [200, 204]) nil end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue`
-
instead.
Returns a resource instance of the new job wrapping the API response.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/zizq/client.rb', line 170 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) body = { queue:, type:, payload: } #: Hash[Symbol, untyped] body[:priority] = priority if priority # ready_at is fractional seconds in Ruby; the server expects ms. body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at body[:retry_limit] = retry_limit if retry_limit body[:backoff] = backoff if backoff body[:retention] = retention if retention body[:unique_key] = unique_key if unique_key body[:unique_while] = unique_while.to_s if unique_while response = post("/jobs", body) data = handle_response!(response, expected: [200, 201]) Resources::Job.new(self, data) end |
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue_bulk`
-
instead.
Returns an array of resource instances wrapping the API response.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/zizq/client.rb', line 205 def enqueue_bulk(jobs:) body = { jobs: jobs.map do |job| wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped] wire[:priority] = job[:priority] if job[:priority] # ready_at is fractional seconds in Ruby; the server expects ms. wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at] wire[:retry_limit] = job[:retry_limit] if job[:retry_limit] wire[:backoff] = job[:backoff] if job[:backoff] wire[:retention] = job[:retention] if job[:retention] wire[:unique_key] = job[:unique_key] if job[:unique_key] wire[:unique_while] = job[:unique_while].to_s if job[:unique_while] wire end } response = post("/jobs/bulk", body) data = handle_response!(response, expected: [200, 201]) data["jobs"].map { |j| Resources::Job.new(self, j) } end |
#get_cron_group(name) ⇒ Object
Fetch a cron group and all its entries.
459 460 461 462 463 |
# File 'lib/zizq/client.rb', line 459 def get_cron_group(name) response = get("/crons/#{enc(name)}") data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#get_cron_group_entry(group, entry) ⇒ Object
Fetch a single cron entry.
510 511 512 513 514 |
# File 'lib/zizq/client.rb', line 510 def get_cron_group_entry(group, entry) response = get("/crons/#{enc(group)}/entries/#{enc(entry)}") data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
406 407 408 409 410 |
# File 'lib/zizq/client.rb', line 406 def get_error(id, attempt:) response = get("/jobs/#{enc(id)}/errors/#{enc(attempt.to_s)}") data = handle_response!(response, expected: 200) Resources::ErrorRecord.new(self, data) end |
#get_job(id) ⇒ Object
Get a single job by ID.
227 228 229 230 231 |
# File 'lib/zizq/client.rb', line 227 def get_job(id) #: (String) -> Resources::Job response = get("/jobs/#{enc(id)}") data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
The path should include any query parameters already (e.g. pagination links from the server’s ‘pages` object). This is intentionally public so that resource objects like Page can follow links without resorting to `.send`.
812 813 814 815 |
# File 'lib/zizq/client.rb', line 812 def get_path(path) #: (String) -> Hash[String, untyped] response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) } handle_response!(response, expected: 200) end |
#get_queues ⇒ Object
List all distinct queue names on the server.
440 441 442 443 444 |
# File 'lib/zizq/client.rb', line 440 def get_queues #: () -> Array[String] response = get("/queues") data = handle_response!(response, expected: 200) data["queues"] end |
#health ⇒ Object
Health check.
427 428 429 430 |
# File 'lib/zizq/client.rb', line 427 def health #: () -> Hash[String, untyped] response = get("/health") handle_response!(response, expected: 200) end |
#list_cron_groups ⇒ Object
List all cron group names.
449 450 451 452 453 |
# File 'lib/zizq/client.rb', line 449 def list_cron_groups response = get("/crons") data = handle_response!(response, expected: 200) data["crons"] end |
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
419 420 421 422 423 424 |
# File 'lib/zizq/client.rb', line 419 def list_errors(id, from: nil, order: nil, limit: nil) params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped] response = get("/jobs/#{enc(id)}/errors", params:) data = handle_response!(response, expected: 200) Resources::ErrorPage.new(self, data) end |
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
Multi-value filters (‘status`, `queue`, `type`, `id`) accept arrays —they are joined with commas as the server expects.
The ‘filter` parameter accepts a jq expression for filtering jobs by payload content (e.g. `.user_id == 42`).
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/zizq/client.rb', line 250 def list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) = { id:, status:, queue:, type:, filter:, from:, order:, limit: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == "" end response = get("/jobs", params:) data = handle_response!(response, expected: 200) Resources::JobPage.new(self, data) end |
#replace_cron_group(name, paused: nil, entries: []) ⇒ Object
Create or replace an entire cron group.
Entries not present in the request are removed. Entries with unchanged expressions preserve their scheduling state.
474 475 476 477 478 479 480 481 482 |
# File 'lib/zizq/client.rb', line 474 def replace_cron_group(name, paused: nil, entries: []) body = { paused:, entries: entries.map { |entry| build_cron_entry(**entry) } }.compact response = put("/crons/#{enc(name)}", body) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Object
Create or replace a single cron entry.
Preserves scheduling state if the expression is unchanged.
546 547 548 549 550 551 |
# File 'lib/zizq/client.rb', line 546 def replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name: entry, expression:, job:, timezone:, paused:) response = put("/crons/#{enc(group)}/entries/#{enc(entry)}", body) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object Also known as: nack
Report a job failure (nack).
Returns the updated job metadata.
If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
649 650 651 652 653 654 655 656 657 658 659 660 |
# File 'lib/zizq/client.rb', line 649 def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) body = { message: } #: Hash[Symbol, untyped] body[:error_type] = error_type if error_type body[:backtrace] = backtrace if backtrace # retry_at is fractional seconds in Ruby; the server expects ms. body[:retry_at] = (retry_at * 1000).to_i if retry_at body[:kill] = kill if kill response = post("/jobs/#{enc(id)}/failure", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#report_success(id) ⇒ Object Also known as: ack
Mark a job as successfully completed (ack).
If this method (or [‘#report_failure`]) is not called upon job completion, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
592 593 594 595 596 |
# File 'lib/zizq/client.rb', line 592 def report_success(id) #: (String) -> nil response = raw_post("/jobs/#{enc(id)}/success") handle_response!(response, expected: 204) nil end |
#report_success_bulk(ids) ⇒ Object Also known as: ack_bulk
Bulk-mark jobs as successfully completed (bulk ack).
See [‘#report_success`] for full details of how acknowledgemen works.
There are two ways in which the server can respond successfully:
-
204 - No Content (All jobs acknowledged)
-
422 - Unprocessible Entity (Some jobs were not found)
Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.
Other error response types will still raise.
615 616 617 618 619 |
# File 'lib/zizq/client.rb', line 615 def report_success_bulk(ids) response = post("/jobs/success", { ids: ids }) return nil if response.status == 422 handle_response!(response, expected: 204) end |
#server_version ⇒ Object
Server version string.
433 434 435 436 437 |
# File 'lib/zizq/client.rb', line 433 def server_version #: () -> String response = get("/version") data = handle_response!(response, expected: 200) data["version"] end |
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server. Yields parsed job hashes.
This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.
If the client does not acknowledge or fail jobs with ‘[#report_success`] or [`#report_failure`] the server will stop sending new jobs to the client as it hits its prefetch limit.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends periodic heartbeat messages to the client which are silently consumed.
Example:
client.take_jobs(prefetch: 5) do |job|
puts "Got job: #{job.inspect}"
client.ack(job.id) # mark the job completed
end
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 |
# File 'lib/zizq/client.rb', line 698 def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) raise ArgumentError, "take_jobs requires a block" unless block params = { prefetch: } #: Hash[Symbol, untyped] params[:queue] = queues.join(",") unless queues.empty? path = build_path("/jobs/take", params:) headers = { "accept" => @stream_accept } headers["worker-id"] = worker_id if worker_id Sync do response = stream_http.get(path, headers) begin raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200 on_connect&.call on_response&.call(response) # Wrap each parsed hash in a Resources::Job before yielding. wrapper = proc { |data| block.call(Resources::Job.new(self, data)) } # async-http returns `nil` for empty response bodies over HTTP/1.1 # (e.g. a 200 with content-length: 0 from the server closing the # stream immediately). Treat that as "no chunks" rather than # crashing in the parser. body = response.body || [] case @format when :json then self.class.parse_ndjson(body, &wrapper) when :msgpack then self.class.parse_msgpack_stream(body, &wrapper) end ensure response.close rescue nil end end rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, IO::TimeoutError, OpenSSL::SSL::SSLError => e raise ConnectionError, e. end |
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. Fields in the `apply:` argument use the same keys as `update_job`.
Terminal jobs (completed/dead) are silently skipped unless explicitly requested via ‘status:` in `where:`, which returns 422.
Returns the number of updated jobs.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/zizq/client.rb', line 384 def update_all_jobs(where: {}, apply: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end body = validate_and_build_set(**apply) response = patch("/jobs", body, params:) data = handle_response!(response, expected: 200) data.fetch("patched") end |
#update_cron_group(name, paused: nil) ⇒ Object
Update group-level fields (currently just pause/unpause).
489 490 491 492 493 |
# File 'lib/zizq/client.rb', line 489 def update_cron_group(name, paused: nil) response = patch("/crons/#{enc(name)}", { paused: }.compact) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#update_cron_group_entry(group, entry, paused:) ⇒ Object
Update entry-level fields (currently just pause/unpause).
559 560 561 562 563 |
# File 'lib/zizq/client.rb', line 559 def update_cron_group_entry(group, entry, paused:) response = patch("/crons/#{enc(group)}/entries/#{enc(entry)}", { paused: }) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Fields not provided are left unchanged. Use ‘Zizq::RESET` to clear a nullable field back to the server default.
Raises ‘Zizq::NotFoundError` if the job does not exist. Raises `Zizq::ClientError` (422) if the job is in a terminal state.
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/zizq/client.rb', line 355 def update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = build_set_body( queue:, priority:, ready_at:, retry_limit:, backoff:, retention: ) response = patch("/jobs/#{enc(id)}", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |