Class: Zizq::Client
- Inherits:
-
Object
- Object
- Zizq::Client
- Defined in:
- lib/zizq/client.rb
Overview
Low-level HTTP wrapper for the Zizq job queue server API.
Supports both JSON and MessagePack serialization formats, determined at construction time.
HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.
Defined Under Namespace
Classes: RawResponse
Constant Summary collapse
- CONTENT_TYPES =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/msgpack", json: "application/json" }.freeze
- STREAM_ACCEPT =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/vnd.zizq.msgpack-stream", json: "application/x-ndjson" }.freeze
Instance Attribute Summary collapse
-
#format ⇒ Object
readonly
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
-
#url ⇒ Object
readonly
The base URL of the Zizq server (e.g. “localhost:7890”).
Class Method Summary collapse
- .make_finalizer(io_queue, http_clients) ⇒ Object
-
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
-
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Instance Method Summary collapse
-
#cleanup_internal_clients ⇒ Object
: () -> void.
-
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
-
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
-
#delete_job(id) ⇒ Object
Delete a single job by ID.
-
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
-
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
-
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
-
#get_job(id) ⇒ Object
Get a single job by ID.
-
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
-
#get_queues ⇒ Object
List all distinct queue names on the server.
-
#health ⇒ Object
Health check.
-
#initialize(url:, format: :msgpack, ssl_context: nil) ⇒ Client
constructor
Initialize a new instance of the client with the given base URL and optional format options.
-
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
-
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
-
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object
(also: #nack)
Report a job failure (nack).
-
#report_success(id) ⇒ Object
(also: #ack)
Mark a job as successfully completed (ack).
-
#report_success_bulk(ids) ⇒ Object
(also: #ack_bulk)
Bulk-mark jobs as successfully completed (bulk ack).
-
#server_version ⇒ Object
Server version string.
-
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server.
-
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
-
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Constructor Details
#initialize(url:, format: :msgpack, ssl_context: nil) ⇒ Client
Initialize a new instance of the client with the given base URL and optional format options.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/zizq/client.rb', line 57 def initialize(url:, format: :msgpack, ssl_context: nil) @url = url.chomp("/") @format = format = { protocol: Async::HTTP::Protocol::HTTP2 } #: Hash[Symbol, untyped] [:ssl_context] = ssl_context if ssl_context @endpoint = Async::HTTP::Endpoint.parse( @url, **, ) # Streaming take uses a dedicated HTTP/1.1 endpoint. The take # connection is long-lived and carries only one request, so HTTP/2's # multiplexing, stream IDs, and frame headers add overhead with no # benefit — there's nothing to multiplex against. Acks/enqueues run # on separate threads with their own HTTP/2 clients, so they're # unaffected either way. HTTP/1.1 gives the stream a plain TCP # socket with no framing tax and measurably better throughput. = .merge( protocol: Async::HTTP::Protocol::HTTP11, ) @stream_endpoint = Async::HTTP::Endpoint.parse( @url, **, ) @mutex = Mutex.new @io_thread = nil #: Thread? @io_queue = nil #: Thread::Queue? # Each thread gets its own Async::HTTP::Client bound to its own # reactor — one for regular request/response traffic (HTTP/2) and # a separate one lazily created on the first take_jobs call # (HTTP/1.1). Both kinds of clients are tracked in a single array # so `close` can shut them all down together. @http_clients = [] #: Array[Async::HTTP::Client] @http_key = :"zizq_http_#{object_id}" @stream_http_key = :"zizq_stream_http_#{object_id}" @content_type = CONTENT_TYPES.fetch(format) @stream_accept = STREAM_ACCEPT.fetch(format) end |
Instance Attribute Details
#format ⇒ Object (readonly)
The message format to use for all communication between the client and the server (default = ‘:msgpack`).
48 49 50 |
# File 'lib/zizq/client.rb', line 48 def format @format end |
#url ⇒ Object (readonly)
The base URL of the Zizq server (e.g. “localhost:7890”)
44 45 46 |
# File 'lib/zizq/client.rb', line 44 def url @url end |
Class Method Details
.make_finalizer(io_queue, http_clients) ⇒ Object
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 |
# File 'lib/zizq/client.rb', line 965 def self.make_finalizer(io_queue, http_clients) -> do io_queue&.close http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end http_clients.clear end end |
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.
Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 |
# File 'lib/zizq/client.rb', line 583 def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void io = StringIO.new("".b) chunks.each do |chunk| # Append new data at the end, then return to the read position. read_pos = io.pos io.seek(0, IO::SEEK_END) io.write(chunk.b) io.seek(read_pos) # Extract complete frames. while io.size - io.pos >= 4 len_bytes = io.read(4) #: String len = len_bytes.unpack1("N") #: Integer if len == 0 # heartbeat next end if io.size - io.pos < len # Incomplete frame — rewind past the length header and wait # for more data. io.seek(-4, IO::SEEK_CUR) break end yield MessagePack.unpack(io.read(len)) end # Compact: discard already-consumed bytes so the StringIO doesn't # grow without bound over the life of the stream. remaining = io.read io = StringIO.new(remaining || "".b) end end |
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Buffers chunks and splits on newline boundaries. The buffer only ever holds one partial line between extractions, so the ‘slice!` cost is trivial. Empty lines (heartbeats) are silently skipped.
561 562 563 564 565 566 567 568 569 570 571 572 |
# File 'lib/zizq/client.rb', line 561 def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void buffer = +"" chunks.each do |chunk| buffer << chunk while (idx = buffer.index("\n")) line = buffer.slice!(0, idx + 1) #: String line.strip! next if line.empty? yield JSON.parse(line) end end end |
Instance Method Details
#cleanup_internal_clients ⇒ Object
: () -> void
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/zizq/client.rb', line 114 def cleanup_internal_clients #: () -> void @mutex.synchronize do @http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end @http_clients.clear end end |
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/zizq/client.rb', line 103 def close #: () -> void if @io_thread&.alive? @mutex.synchronize do @io_queue&.close @io_thread&.join end end self.class.make_finalizer(@io_queue, @http_clients).call end |
#delete_all_jobs(where: {}) ⇒ Object
Delete jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. An empty `where:` hash deletes all jobs.
Returns the number of deleted jobs.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/zizq/client.rb', line 271 def delete_all_jobs(where: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = delete("/jobs", params:) data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_job(id) ⇒ Object
Delete a single job by ID.
256 257 258 259 260 |
# File 'lib/zizq/client.rb', line 256 def delete_job(id) response = delete("/jobs/#{id}") handle_response!(response, expected: [200, 204]) nil end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
Enqueue a new job.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue`
-
instead.
Returns a resource instance of the new job wrapping the API response.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/zizq/client.rb', line 149 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) body = { queue:, type:, payload: } #: Hash[Symbol, untyped] body[:priority] = priority if priority # ready_at is fractional seconds in Ruby; the server expects ms. body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at body[:retry_limit] = retry_limit if retry_limit body[:backoff] = backoff if backoff body[:retention] = retention if retention body[:unique_key] = unique_key if unique_key body[:unique_while] = unique_while.to_s if unique_while response = post("/jobs", body) data = handle_response!(response, expected: [200, 201]) Resources::Job.new(self, data) end |
#enqueue_bulk(jobs:) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
This is a low-level primitive that makes a direct API call to the server using the Zizq API’s expected inputs. Callers should generally use
- ‘Zizq::enqueue_bulk`
-
instead.
Returns an array of resource instances wrapping the API response.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/zizq/client.rb', line 184 def enqueue_bulk(jobs:) body = { jobs: jobs.map do |job| wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped] wire[:priority] = job[:priority] if job[:priority] # ready_at is fractional seconds in Ruby; the server expects ms. wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at] wire[:retry_limit] = job[:retry_limit] if job[:retry_limit] wire[:backoff] = job[:backoff] if job[:backoff] wire[:retention] = job[:retention] if job[:retention] wire[:unique_key] = job[:unique_key] if job[:unique_key] wire[:unique_while] = job[:unique_while].to_s if job[:unique_while] wire end } response = post("/jobs/bulk", body) data = handle_response!(response, expected: [200, 201]) data["jobs"].map { |j| Resources::Job.new(self, j) } end |
#get_error(id, attempt:) ⇒ Object
Get a single error record by job ID and attempt number.
354 355 356 357 358 |
# File 'lib/zizq/client.rb', line 354 def get_error(id, attempt:) response = get("/jobs/#{id}/errors/#{attempt}") data = handle_response!(response, expected: 200) Resources::ErrorRecord.new(self, data) end |
#get_job(id) ⇒ Object
Get a single job by ID.
206 207 208 209 210 |
# File 'lib/zizq/client.rb', line 206 def get_job(id) #: (String) -> Resources::Job response = get("/jobs/#{id}") data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
The path should include any query parameters already (e.g. pagination links from the server’s ‘pages` object). This is intentionally public so that resource objects like Page can follow links without resorting to `.send`.
625 626 627 628 |
# File 'lib/zizq/client.rb', line 625 def get_path(path) #: (String) -> Hash[String, untyped] response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) } handle_response!(response, expected: 200) end |
#get_queues ⇒ Object
List all distinct queue names on the server.
388 389 390 391 392 |
# File 'lib/zizq/client.rb', line 388 def get_queues #: () -> Array[String] response = get("/queues") data = handle_response!(response, expected: 200) data["queues"] end |
#health ⇒ Object
Health check.
375 376 377 378 |
# File 'lib/zizq/client.rb', line 375 def health #: () -> Hash[String, untyped] response = get("/health") handle_response!(response, expected: 200) end |
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Object
List error records for a job.
367 368 369 370 371 372 |
# File 'lib/zizq/client.rb', line 367 def list_errors(id, from: nil, order: nil, limit: nil) params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped] response = get("/jobs/#{id}/errors", params:) data = handle_response!(response, expected: 200) Resources::ErrorPage.new(self, data) end |
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) ⇒ Object
List jobs with optional filters.
Multi-value filters (‘status`, `queue`, `type`, `id`) accept arrays —they are joined with commas as the server expects.
The ‘filter` parameter accepts a jq expression for filtering jobs by payload content (e.g. `.user_id == 42`).
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/zizq/client.rb', line 229 def list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, from: nil, order: nil, limit: nil) = { id:, status:, queue:, type:, filter:, from:, order:, limit: }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == "" end response = get("/jobs", params:) data = handle_response!(response, expected: 200) Resources::JobPage.new(self, data) end |
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Object Also known as: nack
Report a job failure (nack).
Returns the updated job metadata.
If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/zizq/client.rb', line 467 def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) body = { message: } #: Hash[Symbol, untyped] body[:error_type] = error_type if error_type body[:backtrace] = backtrace if backtrace # retry_at is fractional seconds in Ruby; the server expects ms. body[:retry_at] = (retry_at * 1000).to_i if retry_at body[:kill] = kill if kill response = post("/jobs/#{id}/failure", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#report_success(id) ⇒ Object Also known as: ack
Mark a job as successfully completed (ack).
If this method (or [‘#report_failure`]) is not called upon job completion, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server’s global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
410 411 412 413 414 |
# File 'lib/zizq/client.rb', line 410 def report_success(id) #: (String) -> nil response = raw_post("/jobs/#{id}/success") handle_response!(response, expected: 204) nil end |
#report_success_bulk(ids) ⇒ Object Also known as: ack_bulk
Bulk-mark jobs as successfully completed (bulk ack).
See [‘#report_success`] for full details of how acknowledgemen works.
There are two ways in which the server can respond successfully:
-
204 - No Content (All jobs acknowledged)
-
422 - Unprocessible Entity (Some jobs were not found)
Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.
Other error response types will still raise.
433 434 435 436 437 |
# File 'lib/zizq/client.rb', line 433 def report_success_bulk(ids) response = post("/jobs/success", { ids: ids }) return nil if response.status == 422 handle_response!(response, expected: 204) end |
#server_version ⇒ Object
Server version string.
381 382 383 384 385 |
# File 'lib/zizq/client.rb', line 381 def server_version #: () -> String response = get("/version") data = handle_response!(response, expected: 200) data["version"] end |
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) ⇒ Object
Stream jobs from the server. Yields parsed job hashes.
This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.
If the client does not acknowledge or fail jobs with ‘[#report_success`] or [`#report_failure`] the server will stop sending new jobs to the client as it hits its prefetch limit.
Jobs are durable and “at least once” delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends periodic heartbeat messages to the client which are silently consumed.
Example:
client.take_jobs(prefetch: 5) do |job|
puts "Got job: #{job.inspect}"
client.ack(job.id) # mark the job completed
end
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
# File 'lib/zizq/client.rb', line 516 def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) raise ArgumentError, "take_jobs requires a block" unless block params = { prefetch: } #: Hash[Symbol, untyped] params[:queue] = queues.join(",") unless queues.empty? path = build_path("/jobs/take", params:) headers = { "accept" => @stream_accept } headers["worker-id"] = worker_id if worker_id Sync do response = stream_http.get(path, headers) begin raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200 on_connect&.call on_response&.call(response) # Wrap each parsed hash in a Resources::Job before yielding. wrapper = proc { |data| block.call(Resources::Job.new(self, data)) } # async-http returns `nil` for empty response bodies over HTTP/1.1 # (e.g. a 200 with content-length: 0 from the server closing the # stream immediately). Treat that as "no chunks" rather than # crashing in the parser. body = response.body || [] case @format when :json then self.class.parse_ndjson(body, &wrapper) when :msgpack then self.class.parse_msgpack_stream(body, &wrapper) end ensure response.close rescue nil end end rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, OpenSSL::SSL::SSLError => e raise ConnectionError, e. end |
#update_all_jobs(where: {}, apply: {}) ⇒ Object
Update all jobs matching the given filters.
Filters in the ‘where:` argument use the same keys as `list_jobs`. Fields in the `apply:` argument use the same keys as `update_job`.
Terminal jobs (completed/dead) are silently skipped unless explicitly requested via ‘status:` in `where:`, which returns 422.
Returns the number of updated jobs.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/zizq/client.rb', line 332 def update_all_jobs(where: {}, apply: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end body = validate_and_build_set(**apply) response = patch("/jobs", body, params:) data = handle_response!(response, expected: 200) data.fetch("patched") end |
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Object
Update a single job’s mutable fields.
Fields not provided are left unchanged. Use ‘Zizq::RESET` to clear a nullable field back to the server default.
Raises ‘Zizq::NotFoundError` if the job does not exist. Raises `Zizq::ClientError` (422) if the job is in a terminal state.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/zizq/client.rb', line 303 def update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = build_set_body( queue:, priority:, ready_at:, retry_limit:, backoff:, retention: ) response = patch("/jobs/#{id}", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |