Class: Zizq::Client
- Inherits:
-
Object
- Object
- Zizq::Client
- Defined in:
- lib/zizq/client.rb,
sig/generated/zizq/client.rbs
Overview
Low-level HTTP wrapper for the Zizq job queue server API.
Supports both JSON and MessagePack serialization formats, determined at construction time.
HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.
Direct Known Subclasses
Defined Under Namespace
Classes: RawResponse
Constant Summary collapse
- CONTENT_TYPES =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/msgpack", json: "application/json" }.freeze
- STREAM_ACCEPT =
: Hash[Zizq::format, String]
{ #: Hash[Zizq::format, String] msgpack: "application/vnd.zizq.msgpack-stream", json: "application/x-ndjson" }.freeze
Instance Attribute Summary collapse
-
#format ⇒ Zizq::format
readonly
The message format to use for all communication between the client and the server (default =
:msgpack). -
#url ⇒ String
readonly
The base URL of the Zizq server (e.g. "https://localhost:7890").
Class Method Summary collapse
- .make_finalizer(io_queue, http_clients) ⇒ Object
-
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
-
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Instance Method Summary collapse
-
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry
Add a single entry to a cron group (creates the group if needed).
-
#build_cron_entry(name: nil, expression: nil, job: nil, timezone: nil, paused: nil) ⇒ Hash[Symbol, untyped]
Validate and build a cron entry body from keyword arguments.
-
#build_cron_job(type: nil, queue: nil, payload: nil, priority: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Hash[Symbol, untyped]
Validate and build a cron job template from keyword arguments.
-
#build_path(path, params: {}) ⇒ Object
Build a relative path with optional query parameters.
-
#build_set_body(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]
Build the JSON body hash for a PATCH request from set parameters.
-
#build_where_params(options, multi_keys: []) ⇒ Object
Build query params for list endpoints, joining multi-value keys with ",".
-
#cleanup_internal_clients ⇒ Object
: () -> void.
-
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
-
#consume_response(response) ⇒ Object
Read the response body and close it, returning a RawResponse that is safe to use outside the reactor.
-
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Integer
Count jobs matching the given filters.
-
#decode_body(data, content_type: nil) ⇒ Object
: (String, ?content_type: String?) -> Hash[String, untyped].
-
#delete(path, params: {}) ⇒ Object
: (String, ?params: Hash[Symbol, untyped]) -> RawResponse.
-
#delete_all_crons ⇒ Integer
Delete every cron group on the server in a single call.
-
#delete_all_jobs(where: {}) ⇒ Integer
Delete jobs matching the given filters.
-
#delete_cron_group(name) ⇒ void
Delete a cron group and all its entries.
-
#delete_cron_group_entry(group, entry) ⇒ void
Delete a single cron entry.
-
#delete_job(id) ⇒ void
Delete a single job by ID.
-
#enc(value) ⇒ Object
URL-encode a single path segment.
-
#encode_body(body) ⇒ Object
: (Hash[Symbol, untyped]) -> String.
-
#encode_range(value, &block) ⇒ void
Encode an Integer or Range filter into the server's query format.
-
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Resources::Job
Enqueue a new job.
-
#enqueue_bulk(jobs:) ⇒ Array[Resources::Job]
Enqueue multiple jobs atomically in a single bulk request.
-
#ensure_io_thread ⇒ Object
Lazily start the background IO thread (double-checked locking).
-
#erase_all_data ⇒ void
Wipe every cron group and every job on the server.
-
#get(path, params: {}) ⇒ Object
: (String, ?params: Hash[Symbol, untyped]) -> RawResponse.
-
#get_cron_group(name) ⇒ Resources::CronGroup
Fetch a cron group and all its entries.
-
#get_cron_group_entry(group, entry) ⇒ Resources::CronEntry
Fetch a single cron entry.
-
#get_error(id, attempt:) ⇒ Resources::ErrorRecord
Get a single error record by job ID and attempt number.
-
#get_job(id) ⇒ Object
Get a single job by ID.
-
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
-
#get_queues ⇒ Object
List all distinct queue names on the server.
-
#handle_response!(response, expected:) ⇒ Object
Check response status and decode body, raising on errors.
-
#health ⇒ Object
Health check.
-
#http ⇒ Object
Return the calling thread's HTTP client, creating one if needed.
-
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
constructor
Initialize a new instance of the client with the given base URL and optional format options.
-
#io_thread_run ⇒ Object
Main loop for the background IO thread.
-
#list_cron_groups ⇒ Array[String]
List all cron group names.
-
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Resources::ErrorPage
List error records for a job.
-
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil, from: nil, order: nil, limit: nil) ⇒ Resources::JobPage
List jobs with optional filters.
-
#patch(path, body, params: {}) ⇒ Object
: (String, Hash[Symbol, untyped], ?params: Hash[Symbol, untyped]) -> RawResponse.
-
#post(path, body) ⇒ Object
: (String, Hash[Symbol, untyped]) -> RawResponse.
-
#put(path, body) ⇒ Object
: (String, Hash[Symbol, untyped]) -> RawResponse.
-
#raw_post(path) ⇒ Object
: (String) -> RawResponse.
-
#replace_cron_group(name, paused: nil, entries: []) ⇒ Resources::CronGroup
Create or replace an entire cron group.
-
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry
Create or replace a single cron entry.
-
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Resources::Job
(also: #nack)
Report a job failure (nack).
-
#report_success(id) ⇒ Object
(also: #ack)
Mark a job as successfully completed (ack).
-
#report_success_bulk(ids) ⇒ nil
(also: #ack_bulk)
Bulk-mark jobs as successfully completed (bulk ack).
-
#request(&block) ⇒ void
Dispatch a block to the appropriate execution context.
-
#server_version ⇒ Object
Server version string.
-
#stream_http ⇒ Object
Return the calling thread's streaming HTTP client (HTTP/1.1), creating one if needed.
-
#sync_call(&block) ⇒ void
Push a work block to the background IO thread and block until it completes, returning the result or re-raising any exception.
-
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil) {|arg0| ... } ⇒ void
Stream jobs from the server.
-
#thread_local_http(key, endpoint) ⇒ Object
: (Symbol, Async::HTTP::Endpoint) -> Async::HTTP::Client.
-
#update_all_jobs(where: {}, apply: {}) ⇒ Integer
Update all jobs matching the given filters.
-
#update_cron_group(name, paused: nil) ⇒ Resources::CronGroup
Update group-level fields (currently just pause/unpause).
-
#update_cron_group_entry(group, entry, paused:) ⇒ Resources::CronEntry
Update entry-level fields (currently just pause/unpause).
-
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Resources::Job
Update a single job's mutable fields.
-
#validate_and_build_set(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]
Validate set parameters via keyword args (rejects unknown keys) and build the JSON body.
-
#validate_where(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Hash[Symbol, untyped]
Validate and normalize filter parameters for bulk operations.
Constructor Details
#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client
Initialize a new instance of the client with the given base URL and optional format options.
read_timeout and stream_idle_timeout are per-operation socket
I/O timeouts (seconds). Each individual socket read/write is
bounded by the timeout. The streaming #take_jobs endpoint uses
stream_idle_timeout because the server sends heartbeats at
periodic intervals which keeps the connection alive.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/zizq/client.rb', line 65 def initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) @url = url.chomp("/") @format = format = { protocol: Async::HTTP::Protocol::HTTP2, timeout: read_timeout, } #: Hash[Symbol, untyped] [:ssl_context] = ssl_context if ssl_context @endpoint = Async::HTTP::Endpoint.parse( @url, **, ) # Streaming take uses a dedicated HTTP/1.1 endpoint. The take # connection is long-lived and carries only one request, so HTTP/2's # multiplexing, stream IDs, and frame headers add overhead with no # benefit — there's nothing to multiplex against. Acks/enqueues run # on separate threads with their own HTTP/2 clients, so they're # unaffected either way. HTTP/1.1 gives the stream a plain TCP # socket with no framing tax and measurably better throughput. # # The stream endpoint uses `stream_idle_timeout` for its socket # timeout so server heartbeats (~3s) keep it alive while only # genuinely dead connections (no data for the full window) # trigger a reconnect. = .merge( protocol: Async::HTTP::Protocol::HTTP11, timeout: stream_idle_timeout, ) @stream_endpoint = Async::HTTP::Endpoint.parse( @url, **, ) @mutex = Mutex.new @io_thread = nil #: Thread? @io_queue = nil #: Thread::Queue? # Each thread gets its own Async::HTTP::Client bound to its own # reactor — one for regular request/response traffic (HTTP/2) and # a separate one lazily created on the first take_jobs call # (HTTP/1.1). Both kinds of clients are tracked in a single array # so `close` can shut them all down together. @http_clients = [] #: Array[Async::HTTP::Client] @http_key = :"zizq_http_#{object_id}" @stream_http_key = :"zizq_stream_http_#{object_id}" @content_type = CONTENT_TYPES.fetch(format) @stream_accept = STREAM_ACCEPT.fetch(format) end |
Instance Attribute Details
#format ⇒ Zizq::format (readonly)
The message format to use for all communication between the client and
the server (default = :msgpack).
48 49 50 |
# File 'lib/zizq/client.rb', line 48 def format @format end |
#url ⇒ String (readonly)
The base URL of the Zizq server (e.g. "https://localhost:7890")
44 45 46 |
# File 'lib/zizq/client.rb', line 44 def url @url end |
Class Method Details
.make_finalizer(io_queue, http_clients) ⇒ Object
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 |
# File 'lib/zizq/client.rb', line 1350 def self.make_finalizer(io_queue, http_clients) -> do io_queue&.close http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end http_clients.clear end end |
.parse_msgpack_stream(chunks) ⇒ Object
Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.
Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.
Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 |
# File 'lib/zizq/client.rb', line 847 def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void io = StringIO.new("".b) chunks.each do |chunk| # Append new data at the end, then return to the read position. read_pos = io.pos io.seek(0, IO::SEEK_END) io.write(chunk.b) io.seek(read_pos) # Extract complete frames. while io.size - io.pos >= 4 len_bytes = io.read(4) #: String len = len_bytes.unpack1("N") #: Integer if len == 0 # heartbeat next end if io.size - io.pos < len # Incomplete frame — rewind past the length header and wait # for more data. io.seek(-4, IO::SEEK_CUR) break end yield MessagePack.unpack(io.read(len)) end # Compact: discard already-consumed bytes so the StringIO doesn't # grow without bound over the life of the stream. remaining = io.read io = StringIO.new(remaining || "".b) end end |
.parse_ndjson(chunks) ⇒ Object
Parse an NDJSON stream from an enumerable of byte chunks.
Buffers chunks and splits on newline boundaries. The buffer only
ever holds one partial line between extractions, so the slice!
cost is trivial. Empty lines (heartbeats) are silently skipped.
825 826 827 828 829 830 831 832 833 834 835 836 |
# File 'lib/zizq/client.rb', line 825 def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void buffer = +"" chunks.each do |chunk| buffer << chunk while (idx = buffer.index("\n")) line = buffer.slice!(0, idx + 1) #: String line.strip! next if line.empty? yield JSON.parse(line) end end end |
Instance Method Details
#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry
Add a single entry to a cron group (creates the group if needed).
Raises a ClientError (409 Conflict) if an entry with the same name already exists.
605 606 607 608 609 610 |
# File 'lib/zizq/client.rb', line 605 def add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name:, expression:, job:, timezone:, paused:) response = post("/crons/#{enc(group)}/entries", body) data = handle_response!(response, expected: 201) Resources::CronEntry.new(self, data) end |
#build_cron_entry(name: nil, expression: nil, job: nil, timezone: nil, paused: nil) ⇒ Hash[Symbol, untyped]
Validate and build a cron entry body from keyword arguments.
917 918 919 920 921 922 923 924 925 |
# File 'lib/zizq/client.rb', line 917 def build_cron_entry(name: nil, expression: nil, job: nil, timezone: nil, paused: nil) { name:, expression:, timezone:, paused:, job: build_cron_job(**job), }.compact end |
#build_cron_job(type: nil, queue: nil, payload: nil, priority: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Hash[Symbol, untyped]
Validate and build a cron job template from keyword arguments.
Uses keyword args so that unknown keys raise ArgumentError.
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 |
# File 'lib/zizq/client.rb', line 941 def build_cron_job(type: nil, queue: nil, payload: nil, priority: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) job = { type:, queue:, payload: } #: Hash[Symbol, untyped] job[:priority] = priority if priority job[:retry_limit] = retry_limit if retry_limit job[:backoff] = backoff if backoff job[:retention] = retention if retention job[:unique_key] = unique_key if unique_key job[:unique_while] = unique_while.to_s if unique_while job end |
#build_path(path, params: {}) ⇒ Object
Build a relative path with optional query parameters.
902 903 904 905 906 907 |
# File 'lib/zizq/client.rb', line 902 def build_path(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> String unless params.empty? path = "#{path}?#{URI.encode_www_form(params)}" end path end |
#build_set_body(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]
Build the JSON body hash for a PATCH request from set parameters.
UNCHANGEDvalues are omitted (field not sent).RESETvalues are sent asnil(JSON null).nilis rejected — useRESETto clear a field.- Other values are converted to their wire format.
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 |
# File 'lib/zizq/client.rb', line 1054 def build_set_body(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = {} #: Hash[Symbol, untyped] unless queue.equal?(UNCHANGED) raise ArgumentError, "queue cannot be nil; use Zizq::RESET to clear or Zizq::UNCHANGED to leave as-is" if queue.nil? body[:queue] = queue end unless priority.equal?(UNCHANGED) raise ArgumentError, "priority cannot be nil; use Zizq::RESET to clear or Zizq::UNCHANGED to leave as-is" if priority.nil? body[:priority] = priority end unless ready_at.equal?(UNCHANGED) body[:ready_at] = ready_at.equal?(RESET) ? nil : (ready_at.to_f * 1000).to_i end unless retry_limit.equal?(UNCHANGED) body[:retry_limit] = retry_limit.equal?(RESET) ? nil : retry_limit end unless backoff.equal?(UNCHANGED) body[:backoff] = if backoff.equal?(RESET) nil else { exponent: backoff[:exponent].to_f, base_ms: (backoff[:base].to_f * 1000).to_i, jitter_ms: (backoff[:jitter].to_f * 1000).to_i } end end unless retention.equal?(UNCHANGED) body[:retention] = if retention.equal?(RESET) nil else ret = {} #: Hash[Symbol, Integer] ret[:completed_ms] = (retention[:completed].to_f * 1000).to_i if retention[:completed] ret[:dead_ms] = (retention[:dead].to_f * 1000).to_i if retention[:dead] ret end end body end |
#build_where_params(options, multi_keys: []) ⇒ Object
Build query params for list endpoints, joining multi-value keys with ",".
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 |
# File 'lib/zizq/client.rb', line 1107 def build_where_params(, multi_keys: []) #: (Hash[Symbol, untyped], ?multi_keys: Array[Symbol]) -> Hash[Symbol, untyped] params = {} #: Hash[Symbol, untyped] .each do |key, value| if multi_keys.include?(key) && value.is_a?(Array) params[key] = value.join(",") else params[key] = value end end params end |
#cleanup_internal_clients ⇒ Object
: () -> void
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/zizq/client.rb', line 135 def cleanup_internal_clients #: () -> void @mutex.synchronize do @http_clients.each do |ref| ref.close rescue WeakRef::RefError # Client already GC'd (owning thread exited). rescue NoMethodError # The async connection pool may hold references to tasks whose # fibers were already reclaimed when their owning Sync reactor # exited. Stopping those dead tasks raises NoMethodError; safe # to ignore. end @http_clients.clear end end |
#close ⇒ Object
Close all thread-local HTTP clients and release connections.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/zizq/client.rb', line 124 def close #: () -> void if @io_thread&.alive? @mutex.synchronize do @io_queue&.close @io_thread&.join end end self.class.make_finalizer(@io_queue, @http_clients).call end |
#consume_response(response) ⇒ Object
Read the response body and close it, returning a RawResponse that is safe to use outside the reactor.
1155 1156 1157 1158 1159 |
# File 'lib/zizq/client.rb', line 1155 def consume_response(response) #: (untyped) -> RawResponse RawResponse.new(status: response.status, body: response.read, content_type: response.headers["content-type"]) ensure response.close end |
#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Integer
Count jobs matching the given filters.
Accepts the same filter arguments as list_jobs (minus pagination).
Returns the count as an integer.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/zizq/client.rb', line 308 def count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) = { id:, status:, queue:, type:, filter:, priority: encode_range(priority), ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i }, attempts: encode_range(attempts), }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = get("/jobs/count", params:) data = handle_response!(response, expected: 200) data.fetch("count") end |
#decode_body(data, content_type: nil) ⇒ Object
: (String, ?content_type: String?) -> Hash[String, untyped]
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 |
# File 'lib/zizq/client.rb', line 1127 def decode_body(data, content_type: nil) #: (String, ?content_type: String?) -> Hash[String, untyped] format = case content_type when /msgpack/ then :msgpack when /json/ then :json else @format end case format when :msgpack then MessagePack.unpack(data) when :json then JSON.parse(data) else raise ArgumentError, "Unknown format: #{format}" end end |
#delete(path, params: {}) ⇒ Object
: (String, ?params: Hash[Symbol, untyped]) -> RawResponse
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 |
# File 'lib/zizq/client.rb', line 1299 def delete(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> RawResponse request do |http| consume_response( http.delete( build_path(path, params:), {"accept" => @content_type} ) ) end end |
#delete_all_crons ⇒ Integer
Delete every cron group on the server in a single call.
Returns the number of cron groups removed.
Destructive. This deletes every cron group on the server.
For granular deletes, use delete_cron_group with a specific
name.
Requires a Pro license on the server.
576 577 578 579 580 |
# File 'lib/zizq/client.rb', line 576 def delete_all_crons response = delete("/crons") data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_all_jobs(where: {}) ⇒ Integer
Delete jobs matching the given filters.
Filters in the where: argument use the same keys as list_jobs. An
empty where: hash deletes all jobs.
Returns the number of deleted jobs.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/zizq/client.rb', line 359 def delete_all_jobs(where: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end response = delete("/jobs", params:) data = handle_response!(response, expected: 200) data.fetch("deleted") end |
#delete_cron_group(name) ⇒ void
This method returns an undefined value.
Delete a cron group and all its entries.
559 560 561 562 563 |
# File 'lib/zizq/client.rb', line 559 def delete_cron_group(name) response = delete("/crons/#{enc(name)}") handle_response!(response, expected: 204) nil end |
#delete_cron_group_entry(group, entry) ⇒ void
This method returns an undefined value.
Delete a single cron entry.
647 648 649 650 651 |
# File 'lib/zizq/client.rb', line 647 def delete_cron_group_entry(group, entry) response = delete("/crons/#{enc(group)}/entries/#{enc(entry)}") handle_response!(response, expected: 204) nil end |
#delete_job(id) ⇒ void
This method returns an undefined value.
Delete a single job by ID.
344 345 346 347 348 |
# File 'lib/zizq/client.rb', line 344 def delete_job(id) response = delete("/jobs/#{enc(id)}") handle_response!(response, expected: [200, 204]) nil end |
#enc(value) ⇒ Object
URL-encode a single path segment.
897 898 899 |
# File 'lib/zizq/client.rb', line 897 def enc(value) #: (String) -> String URI.encode_uri_component(value) end |
#encode_body(body) ⇒ Object
: (Hash[Symbol, untyped]) -> String
1119 1120 1121 1122 1123 1124 1125 |
# File 'lib/zizq/client.rb', line 1119 def encode_body(body) #: (Hash[Symbol, untyped]) -> String case @format when :msgpack then MessagePack.pack(body) when :json then JSON.generate(body) else raise ArgumentError, "Unknown format: #{@format}" end end |
#encode_range(value, &block) ⇒ void
This method returns an undefined value.
Encode an Integer or Range filter into the server's query format.
Accepted shapes:
nil->nil(no filter sent)Integer->"N"(single value)(a..b)->"A..B"(inclusive both ends)(a..)->"A.."(lower bound only)(..b)->"..B"(upper bound only)
Exclusive ranges (a...b, a..., ...b) raise ArgumentError — the
server only supports inclusive bounds. A bound transformation block can
convert values before formatting (e.g. seconds -> ms for ready_at).
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 |
# File 'lib/zizq/client.rb', line 1010 def encode_range(value, &block) block ||= ->(v) { v.to_i } case value when nil nil when Range if value.exclude_end? raise ArgumentError, "exclusive ranges are not supported by the server; use an inclusive range (a..b) instead" end "#{value.begin&.then(&block)}..#{value.end&.then(&block)}" else block.call(value).to_s end end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Resources::Job
Enqueue a new job.
This is a low-level primitive that makes a direct API call to the server
using the Zizq API's expected inputs. Callers should generally use
[Zizq::enqueue] instead.
Returns a resource instance of the new job wrapping the API response.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/zizq/client.rb', line 170 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) body = { queue:, type:, payload: } #: Hash[Symbol, untyped] body[:priority] = priority if priority # ready_at is fractional seconds in Ruby; the server expects ms. body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at body[:retry_limit] = retry_limit if retry_limit body[:backoff] = backoff if backoff body[:retention] = retention if retention body[:unique_key] = unique_key if unique_key body[:unique_while] = unique_while.to_s if unique_while response = post("/jobs", body) data = handle_response!(response, expected: [200, 201]) Resources::Job.new(self, data) end |
#enqueue_bulk(jobs:) ⇒ Array[Resources::Job]
Enqueue multiple jobs atomically in a single bulk request.
This is a low-level primitive that makes a direct API call to the server
using the Zizq API's expected inputs. Callers should generally use
[Zizq::enqueue_bulk] instead.
Returns an array of resource instances wrapping the API response.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/zizq/client.rb', line 205 def enqueue_bulk(jobs:) body = { jobs: jobs.map do |job| wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped] wire[:priority] = job[:priority] if job[:priority] # ready_at is fractional seconds in Ruby; the server expects ms. wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at] wire[:retry_limit] = job[:retry_limit] if job[:retry_limit] wire[:backoff] = job[:backoff] if job[:backoff] wire[:retention] = job[:retention] if job[:retention] wire[:unique_key] = job[:unique_key] if job[:unique_key] wire[:unique_while] = job[:unique_while].to_s if job[:unique_while] wire end } response = post("/jobs/bulk", body) data = handle_response!(response, expected: [200, 201]) data["jobs"].map { |j| Resources::Job.new(self, j) } end |
#ensure_io_thread ⇒ Object
Lazily start the background IO thread (double-checked locking).
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 |
# File 'lib/zizq/client.rb', line 1180 def ensure_io_thread #: () -> void return if @io_thread&.alive? @mutex.synchronize do return if @io_thread&.alive? @io_queue = Thread::Queue.new @io_thread = Thread.new { io_thread_run } @io_thread.name = "zizq-io" end end |
#erase_all_data ⇒ void
This method returns an undefined value.
Wipe every cron group and every job on the server.
Equivalent to calling delete_all_crons followed by
delete_all_jobs (no filter), but in a single request. Useful
primarily as a setup/teardown step in tests where you want a
known-empty server between scenarios.
Destructive. No filters, no escape hatch, no confirmation — the server-side operation simply returns once everything is gone.
Named erase_all_data rather than reset because Zizq.reset!
already exists at the module level for client-side SDK state.
389 390 391 392 393 |
# File 'lib/zizq/client.rb', line 389 def erase_all_data response = raw_post("/reset") handle_response!(response, expected: 204) nil end |
#get(path, params: {}) ⇒ Object
: (String, ?params: Hash[Symbol, untyped]) -> RawResponse
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 |
# File 'lib/zizq/client.rb', line 1253 def get(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> RawResponse request do |http| consume_response( http.get( build_path(path, params:), {"accept" => @content_type} ) ) end end |
#get_cron_group(name) ⇒ Resources::CronGroup
Fetch a cron group and all its entries.
519 520 521 522 523 |
# File 'lib/zizq/client.rb', line 519 def get_cron_group(name) response = get("/crons/#{enc(name)}") data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#get_cron_group_entry(group, entry) ⇒ Resources::CronEntry
Fetch a single cron entry.
587 588 589 590 591 |
# File 'lib/zizq/client.rb', line 587 def get_cron_group_entry(group, entry) response = get("/crons/#{enc(group)}/entries/#{enc(entry)}") data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#get_error(id, attempt:) ⇒ Resources::ErrorRecord
Get a single error record by job ID and attempt number.
466 467 468 469 470 |
# File 'lib/zizq/client.rb', line 466 def get_error(id, attempt:) response = get("/jobs/#{enc(id)}/errors/#{enc(attempt.to_s)}") data = handle_response!(response, expected: 200) Resources::ErrorRecord.new(self, data) end |
#get_job(id) ⇒ Object
Get a single job by ID.
227 228 229 230 231 |
# File 'lib/zizq/client.rb', line 227 def get_job(id) #: (String) -> Resources::Job response = get("/jobs/#{enc(id)}") data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#get_path(path) ⇒ Object
GET a path on the server and return the decoded response body.
The path should include any query parameters already (e.g. pagination
links from the server's pages object). This is intentionally public
so that resource objects like Page can follow links without resorting
to .send.
889 890 891 892 |
# File 'lib/zizq/client.rb', line 889 def get_path(path) #: (String) -> Hash[String, untyped] response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) } handle_response!(response, expected: 200) end |
#get_queues ⇒ Object
List all distinct queue names on the server.
500 501 502 503 504 |
# File 'lib/zizq/client.rb', line 500 def get_queues #: () -> Array[String] response = get("/queues") data = handle_response!(response, expected: 200) data["queues"] end |
#handle_response!(response, expected:) ⇒ Object
Check response status and decode body, raising on errors.
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 |
# File 'lib/zizq/client.rb', line 1323 def handle_response!(response, expected:) #: (RawResponse, expected: Integer | Array[Integer]) -> Hash[String, untyped]? status = response.status expected_statuses = Array(expected) ct = response.content_type if expected_statuses.include?(status) return nil if status == 204 decode_body(response.body, content_type: ct) else body = begin decode_body(response.body, content_type: ct) rescue nil end = body&.fetch("error", nil) || "HTTP #{status}" error_class = case status when 404 then NotFoundError when 400..499 then ClientError when 500..599 then ServerError else ResponseError end raise error_class.new(, status: status, body: body) end end |
#health ⇒ Object
Health check.
487 488 489 490 |
# File 'lib/zizq/client.rb', line 487 def health #: () -> Hash[String, untyped] response = get("/health") handle_response!(response, expected: 200) end |
#http ⇒ Object
Return the calling thread's HTTP client, creating one if needed. Uses thread_variable_get/set (not Thread.current) because the latter is fiber-local — each Async fiber would get its own client. The tracking array holds WeakRefs so clients from exited threads can be garbage-collected.
1228 1229 1230 |
# File 'lib/zizq/client.rb', line 1228 def http #: () -> Async::HTTP::Client thread_local_http(@http_key, @endpoint) end |
#io_thread_run ⇒ Object
Main loop for the background IO thread. Mirrors AckProcessor: runs an Async reactor, pops work from the queue (fiber-scheduler-aware), and dispatches each call as a concurrent fiber via a barrier.
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 |
# File 'lib/zizq/client.rb', line 1195 def io_thread_run #: () -> void ObjectSpace.define_finalizer( self, self.class.make_finalizer(@io_queue, @http_clients) ) Sync do = Async::Barrier.new while (item = @io_queue.pop) block, result_queue = item .async do result_queue.push([:ok, block.call(http)]) rescue Exception => e # rubocop:disable Lint/RescueException # Must catch Exception (not just StandardError) to ensure the # caller is always unblocked. Without this, errors like # NoMemoryError or library-level Exceptions would kill the IO # thread and leave callers blocking on result_queue.pop forever. result_queue.push([:error, e]) end end .wait end ensure ObjectSpace.undefine_finalizer(self) end |
#list_cron_groups ⇒ Array[String]
List all cron group names.
509 510 511 512 513 |
# File 'lib/zizq/client.rb', line 509 def list_cron_groups response = get("/crons") data = handle_response!(response, expected: 200) data["crons"] end |
#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Resources::ErrorPage
List error records for a job.
479 480 481 482 483 484 |
# File 'lib/zizq/client.rb', line 479 def list_errors(id, from: nil, order: nil, limit: nil) params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped] response = get("/jobs/#{enc(id)}/errors", params:) data = handle_response!(response, expected: 200) Resources::ErrorPage.new(self, data) end |
#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil, from: nil, order: nil, limit: nil) ⇒ Resources::JobPage
List jobs with optional filters.
Multi-value filters (status, queue, type, id) accept arrays —
they are joined with commas as the server expects.
The filter parameter accepts a jq expression for filtering jobs by
payload content (e.g. .user_id == 42).
Range filters (priority, attempts, ready_at) accept exact values or
inclusive ranges.
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/zizq/client.rb', line 256 def list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil, from: nil, order: nil, limit: nil) = { id:, status:, queue:, type:, filter:, priority: encode_range(priority), ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i }, attempts: encode_range(attempts), from:, order:, limit:, }.compact #: Hash[Symbol, untyped] multi_keys = %i[id status queue type] params = build_where_params(, multi_keys:) # An empty filter ([] or "") matches nothing — short-circuit. multi_keys.each do |key| return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == "" end response = get("/jobs", params:) data = handle_response!(response, expected: 200) Resources::JobPage.new(self, data) end |
#patch(path, body, params: {}) ⇒ Object
: (String, Hash[Symbol, untyped], ?params: Hash[Symbol, untyped]) -> RawResponse
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 |
# File 'lib/zizq/client.rb', line 1310 def patch(path, body, params: {}) #: (String, Hash[Symbol, untyped], ?params: Hash[Symbol, untyped]) -> RawResponse request do |http| consume_response( http.patch( build_path(path, params:), {"content-type" => @content_type, "accept" => @content_type}, Protocol::HTTP::Body::Buffered.wrap(encode_body(body)) ) ) end end |
#post(path, body) ⇒ Object
: (String, Hash[Symbol, untyped]) -> RawResponse
1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 |
# File 'lib/zizq/client.rb', line 1264 def post(path, body) #: (String, Hash[Symbol, untyped]) -> RawResponse request do |http| consume_response( http.post( build_path(path), {"content-type" => @content_type, "accept" => @content_type}, Protocol::HTTP::Body::Buffered.wrap(encode_body(body)) ) ) end end |
#put(path, body) ⇒ Object
: (String, Hash[Symbol, untyped]) -> RawResponse
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 |
# File 'lib/zizq/client.rb', line 1287 def put(path, body) #: (String, Hash[Symbol, untyped]) -> RawResponse request do |http| consume_response( http.put( build_path(path), {"content-type" => @content_type, "accept" => @content_type}, Protocol::HTTP::Body::Buffered.wrap(encode_body(body)) ) ) end end |
#raw_post(path) ⇒ Object
: (String) -> RawResponse
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 |
# File 'lib/zizq/client.rb', line 1276 def raw_post(path) #: (String) -> RawResponse request do |http| consume_response( http.post( build_path(path), {"accept" => @content_type} ) ) end end |
#replace_cron_group(name, paused: nil, entries: []) ⇒ Resources::CronGroup
Create or replace an entire cron group.
Entries not present in the request are removed. Entries with unchanged expressions preserve their scheduling state.
534 535 536 537 538 539 540 541 542 |
# File 'lib/zizq/client.rb', line 534 def replace_cron_group(name, paused: nil, entries: []) body = { paused:, entries: entries.map { |entry| build_cron_entry(**entry) } }.compact response = put("/crons/#{enc(name)}", body) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry
Create or replace a single cron entry.
Preserves scheduling state if the expression is unchanged.
623 624 625 626 627 628 |
# File 'lib/zizq/client.rb', line 623 def replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) body = build_cron_entry(name: entry, expression:, job:, timezone:, paused:) response = put("/crons/#{enc(group)}/entries/#{enc(entry)}", body) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Resources::Job Also known as: nack
Report a job failure (nack).
Returns the updated job metadata.
If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server's global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.
Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'lib/zizq/client.rb', line 726 def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) body = { message: } #: Hash[Symbol, untyped] body[:error_type] = error_type if error_type body[:backtrace] = backtrace if backtrace # retry_at is fractional seconds in Ruby; the server expects ms. body[:retry_at] = (retry_at * 1000).to_i if retry_at body[:kill] = kill if kill response = post("/jobs/#{enc(id)}/failure", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#report_success(id) ⇒ Object Also known as: ack
Mark a job as successfully completed (ack).
If this method (or [#report_failure]) is not called upon job
completion, the Zizq server will consider it in-flight and will not
send any more jobs if the prefetch limit has been reached, or the
server's global in-flight limit has been reached. Jobs must be either
acknowledged or failed before new jobs are sent.
Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.
669 670 671 672 673 |
# File 'lib/zizq/client.rb', line 669 def report_success(id) #: (String) -> nil response = raw_post("/jobs/#{enc(id)}/success") handle_response!(response, expected: 204) nil end |
#report_success_bulk(ids) ⇒ nil Also known as: ack_bulk
Bulk-mark jobs as successfully completed (bulk ack).
See [#report_success] for full details of how acknowledgemen works.
There are two ways in which the server can respond successfully:
- 204 - No Content (All jobs acknowledged)
- 422 - Unprocessible Entity (Some jobs were not found)
Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.
Other error response types will still raise.
692 693 694 695 696 |
# File 'lib/zizq/client.rb', line 692 def report_success_bulk(ids) response = post("/jobs/success", { ids: ids }) return nil if response.status == 422 handle_response!(response, expected: 204) end |
#request(&block) ⇒ void
This method returns an undefined value.
Dispatch a block to the appropriate execution context.
If already inside an Async reactor (e.g. AckProcessor, producer), yields the calling thread's HTTP client directly. Otherwise, dispatches via the persistent background IO thread.
1145 1146 1147 1148 1149 1150 1151 |
# File 'lib/zizq/client.rb', line 1145 def request(&block) #: () { (Async::HTTP::Client) -> RawResponse } -> RawResponse if Async::Task.current? yield http else sync_call(&block) end end |
#server_version ⇒ Object
Server version string.
493 494 495 496 497 |
# File 'lib/zizq/client.rb', line 493 def server_version #: () -> String response = get("/version") data = handle_response!(response, expected: 200) data["version"] end |
#stream_http ⇒ Object
Return the calling thread's streaming HTTP client (HTTP/1.1),
creating one if needed. See #http for the thread-local locking
rationale. Kept separate from the main client so the long-lived
/jobs/take connection doesn't share an HTTP/2 session with
ack/enqueue traffic.
1237 1238 1239 |
# File 'lib/zizq/client.rb', line 1237 def stream_http #: () -> Async::HTTP::Client thread_local_http(@stream_http_key, @stream_endpoint) end |
#sync_call(&block) ⇒ void
This method returns an undefined value.
Push a work block to the background IO thread and block until it completes, returning the result or re-raising any exception.
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 |
# File 'lib/zizq/client.rb', line 1163 def sync_call(&block) #: () { (Async::HTTP::Client) -> RawResponse } -> RawResponse ensure_io_thread result_queue = Thread::Queue.new @io_queue.push([block, result_queue]) tag, value = result_queue.pop if tag == :ok value else raise value end rescue ClosedQueueError raise ConnectionError, "client is closed" end |
#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil) {|arg0| ... } ⇒ void
This method returns an undefined value.
Stream jobs from the server. Yields parsed job hashes.
This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.
If the client does not acknowledge or fail jobs with [#report_success]
or [#report_failure] the server will stop sending new jobs to the
client as it hits its prefetch limit.
Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.
The Zizq server sends periodic heartbeat messages to the client which are silently consumed.
Example:
client.take_jobs(prefetch: 5) do |job|
puts "Got job: #{job.inspect}"
client.ack(job.id) # mark the job completed
end
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 |
# File 'lib/zizq/client.rb', line 775 def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block) raise ArgumentError, "take_jobs requires a block" unless block params = { prefetch: } #: Hash[Symbol, untyped] params[:queue] = queues.join(",") unless queues.empty? path = build_path("/jobs/take", params:) headers = { "accept" => @stream_accept } headers["worker-id"] = worker_id if worker_id Sync do response = stream_http.get(path, headers) begin raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200 on_connect&.call on_response&.call(response) # Wrap each parsed hash in a Resources::Job before yielding. wrapper = proc { |data| block.call(Resources::Job.new(self, data)) } # async-http returns `nil` for empty response bodies over HTTP/1.1 # (e.g. a 200 with content-length: 0 from the server closing the # stream immediately). Treat that as "no chunks" rather than # crashing in the parser. body = response.body || [] case @format when :json then self.class.parse_ndjson(body, &wrapper) when :msgpack then self.class.parse_msgpack_stream(body, &wrapper) end ensure response.close rescue nil end end rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, IO::TimeoutError, OpenSSL::SSL::SSLError => e raise ConnectionError, e. end |
#thread_local_http(key, endpoint) ⇒ Object
: (Symbol, Async::HTTP::Endpoint) -> Async::HTTP::Client
1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 |
# File 'lib/zizq/client.rb', line 1241 def thread_local_http(key, endpoint) #: (Symbol, Async::HTTP::Endpoint) -> Async::HTTP::Client Thread.current.thread_variable_get(key) || begin client = Async::HTTP::Client.new(endpoint) @mutex.synchronize do @http_clients.reject! { |ref| !ref.weakref_alive? } @http_clients << WeakRef.new(client) end Thread.current.thread_variable_set(key, client) client end end |
#update_all_jobs(where: {}, apply: {}) ⇒ Integer
Update all jobs matching the given filters.
Filters in the where: argument use the same keys as list_jobs.
Fields in the apply: argument use the same keys as update_job.
Terminal jobs (completed/dead) are silently skipped unless explicitly
requested via status: in where:, which returns 422.
Returns the number of updated jobs.
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
# File 'lib/zizq/client.rb', line 444 def update_all_jobs(where: {}, apply: {}) filter_params = validate_where(**where) multi_keys = %i[id status queue type] params = build_where_params(filter_params, multi_keys:) # An empty multi-value filter matches nothing — short-circuit. multi_keys.each do |key| return 0 if params[key] == "" end body = validate_and_build_set(**apply) response = patch("/jobs", body, params:) data = handle_response!(response, expected: 200) data.fetch("patched") end |
#update_cron_group(name, paused: nil) ⇒ Resources::CronGroup
Update group-level fields (currently just pause/unpause).
549 550 551 552 553 |
# File 'lib/zizq/client.rb', line 549 def update_cron_group(name, paused: nil) response = patch("/crons/#{enc(name)}", { paused: }.compact) data = handle_response!(response, expected: 200) Resources::CronGroup.new(self, data) end |
#update_cron_group_entry(group, entry, paused:) ⇒ Resources::CronEntry
Update entry-level fields (currently just pause/unpause).
636 637 638 639 640 |
# File 'lib/zizq/client.rb', line 636 def update_cron_group_entry(group, entry, paused:) response = patch("/crons/#{enc(group)}/entries/#{enc(entry)}", { paused: }) data = handle_response!(response, expected: 200) Resources::CronEntry.new(self, data) end |
#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Resources::Job
Update a single job's mutable fields.
Fields not provided are left unchanged. Use Zizq::RESET to clear
a nullable field back to the server default.
Raises Zizq::NotFoundError if the job does not exist.
Raises Zizq::ClientError (422) if the job is in a terminal state.
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'lib/zizq/client.rb', line 411 def update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) body = build_set_body( queue:, priority:, ready_at:, retry_limit:, backoff:, retention: ) response = patch("/jobs/#{enc(id)}", body) data = handle_response!(response, expected: 200) Resources::Job.new(self, data) end |
#validate_and_build_set(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]
Validate set parameters via keyword args (rejects unknown keys) and
build the JSON body. Used by update_all_jobs.
1037 1038 1039 1040 1041 1042 1043 1044 |
# File 'lib/zizq/client.rb', line 1037 def validate_and_build_set(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) build_set_body(queue:, priority:, ready_at:, retry_limit:, backoff:, retention:) end |
#validate_where(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Hash[Symbol, untyped]
Validate and normalize filter parameters for bulk operations.
Uses keyword arguments so that unknown keys raise ArgumentError.
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 |
# File 'lib/zizq/client.rb', line 973 def validate_where(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) { id:, status:, queue:, type:, filter:, priority: encode_range(priority), ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i }, attempts: encode_range(attempts), }.compact end |