Class: Zizq::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/zizq/client.rb,
sig/generated/zizq/client.rbs

Overview

Low-level HTTP wrapper for the Zizq job queue server API.

Supports both JSON and MessagePack serialization formats, determined at construction time.

HTTP requests are dispatched through a persistent background IO thread when called from non-Async contexts, keeping the HTTP/2 connection alive across calls and avoiding ephemeral port exhaustion. When called from within an existing Async reactor, the shared HTTP client is used directly.

Direct Known Subclasses

Test::Client

Defined Under Namespace

Classes: RawResponse

Constant Summary collapse

CONTENT_TYPES =

: Hash[Zizq::format, String]

Returns:

  • (Object)
{ #: Hash[Zizq::format, String]
  msgpack: "application/msgpack",
  json: "application/json"
}.freeze
STREAM_ACCEPT =

: Hash[Zizq::format, String]

Returns:

  • (Object)
{ #: Hash[Zizq::format, String]
  msgpack: "application/vnd.zizq.msgpack-stream",
  json: "application/x-ndjson"
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, format: :msgpack, ssl_context: nil, read_timeout: 30, stream_idle_timeout: 30) ⇒ Client

Initialize a new instance of the client with the given base URL and optional format options.

read_timeout and stream_idle_timeout are per-operation socket I/O timeouts (seconds). Each individual socket read/write is bounded by the timeout. The streaming #take_jobs endpoint uses stream_idle_timeout because the server sends heartbeats at periodic intervals which keeps the connection alive.

Parameters:

  • url: (String)
  • format: (Zizq::format) (defaults to: :msgpack)
  • ssl_context: (OpenSSL::SSL::SSLContext, nil) (defaults to: nil)
  • read_timeout: (Numeric) (defaults to: 30)
  • stream_idle_timeout: (Numeric) (defaults to: 30)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/zizq/client.rb', line 65

def initialize(url:,
               format: :msgpack,
               ssl_context: nil,
               read_timeout: 30,
               stream_idle_timeout: 30)
  @url = url.chomp("/")
  @format = format

  endpoint_options = {
    protocol: Async::HTTP::Protocol::HTTP2,
    timeout: read_timeout,
  } #: Hash[Symbol, untyped]
  endpoint_options[:ssl_context] = ssl_context if ssl_context

  @endpoint = Async::HTTP::Endpoint.parse(
    @url,
    **endpoint_options,
  )

  # Streaming take uses a dedicated HTTP/1.1 endpoint. The take
  # connection is long-lived and carries only one request, so HTTP/2's
  # multiplexing, stream IDs, and frame headers add overhead with no
  # benefit — there's nothing to multiplex against. Acks/enqueues run
  # on separate threads with their own HTTP/2 clients, so they're
  # unaffected either way. HTTP/1.1 gives the stream a plain TCP
  # socket with no framing tax and measurably better throughput.
  #
  # The stream endpoint uses `stream_idle_timeout` for its socket
  # timeout so server heartbeats (~3s) keep it alive while only
  # genuinely dead connections (no data for the full window)
  # trigger a reconnect.
  stream_endpoint_options = endpoint_options.merge(
    protocol: Async::HTTP::Protocol::HTTP11,
    timeout: stream_idle_timeout,
  )
  @stream_endpoint = Async::HTTP::Endpoint.parse(
    @url,
    **stream_endpoint_options,
  )

  @mutex = Mutex.new

  @io_thread = nil #: Thread?
  @io_queue = nil #: Thread::Queue?

  # Each thread gets its own Async::HTTP::Client bound to its own
  # reactor — one for regular request/response traffic (HTTP/2) and
  # a separate one lazily created on the first take_jobs call
  # (HTTP/1.1). Both kinds of clients are tracked in a single array
  # so `close` can shut them all down together.
  @http_clients = [] #: Array[Async::HTTP::Client]
  @http_key = :"zizq_http_#{object_id}"
  @stream_http_key = :"zizq_stream_http_#{object_id}"

  @content_type = CONTENT_TYPES.fetch(format)
  @stream_accept = STREAM_ACCEPT.fetch(format)
end

Instance Attribute Details

#formatZizq::format (readonly)

The message format to use for all communication between the client and the server (default = :msgpack).

Returns:

  • (Zizq::format)


48
49
50
# File 'lib/zizq/client.rb', line 48

def format
  @format
end

#urlString (readonly)

The base URL of the Zizq server (e.g. "https://localhost:7890")

Returns:

  • (String)


44
45
46
# File 'lib/zizq/client.rb', line 44

def url
  @url
end

Class Method Details

.make_finalizer(io_queue, http_clients) ⇒ Object

Parameters:

  • io_queue (Object)
  • http_clients (Object)

Returns:

  • (Object)


1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
# File 'lib/zizq/client.rb', line 1350

def self.make_finalizer(io_queue, http_clients)
  -> do
    io_queue&.close
    http_clients.each do |ref|
      ref.close
    rescue WeakRef::RefError
      # Client already GC'd (owning thread exited).
    rescue NoMethodError
      # The async connection pool may hold references to tasks whose
      # fibers were already reclaimed when their owning Sync reactor
      # exited. Stopping those dead tasks raises NoMethodError; safe
      # to ignore.
    end
    http_clients.clear
  end
end

.parse_msgpack_stream(chunks) ⇒ Object

Parse a length-prefixed MessagePack stream from an enumerable of byte chunks.

Format: [4-byte big-endian length][MsgPack payload]. A zero-length frame is a heartbeat and is silently skipped.

Uses StringIO for efficient position-based reading rather than repeatedly slicing from the front of a String (which copies all remaining bytes on every extraction).

Parameters:

  • chunks (Object)

Returns:

  • (Object)


847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
# File 'lib/zizq/client.rb', line 847

def self.parse_msgpack_stream(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void
  io = StringIO.new("".b)

  chunks.each do |chunk|
    # Append new data at the end, then return to the read position.
    read_pos = io.pos
    io.seek(0, IO::SEEK_END)
    io.write(chunk.b)
    io.seek(read_pos)

    # Extract complete frames.
    while io.size - io.pos >= 4
      len_bytes = io.read(4) #: String
      len = len_bytes.unpack1("N") #: Integer

      if len == 0 # heartbeat
        next
      end

      if io.size - io.pos < len
        # Incomplete frame — rewind past the length header and wait
        # for more data.
        io.seek(-4, IO::SEEK_CUR)
        break
      end

      yield MessagePack.unpack(io.read(len))
    end

    # Compact: discard already-consumed bytes so the StringIO doesn't
    # grow without bound over the life of the stream.
    remaining = io.read
    io = StringIO.new(remaining || "".b)
  end
end

.parse_ndjson(chunks) ⇒ Object

Parse an NDJSON stream from an enumerable of byte chunks.

Buffers chunks and splits on newline boundaries. The buffer only ever holds one partial line between extractions, so the slice! cost is trivial. Empty lines (heartbeats) are silently skipped.

Parameters:

  • chunks (Object)

Returns:

  • (Object)


825
826
827
828
829
830
831
832
833
834
835
836
# File 'lib/zizq/client.rb', line 825

def self.parse_ndjson(chunks) #: (Enumerable[String]) { (Hash[String, untyped]) -> void } -> void
  buffer = +""
  chunks.each do |chunk|
    buffer << chunk
    while (idx = buffer.index("\n"))
      line = buffer.slice!(0, idx + 1) #: String
      line.strip!
      next if line.empty?
      yield JSON.parse(line)
    end
  end
end

Instance Method Details

#add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry

Add a single entry to a cron group (creates the group if needed).

Raises a ClientError (409 Conflict) if an entry with the same name already exists.

Parameters:

  • group (String)
  • name: (String)
  • expression: (String)
  • job: (Zizq::cron_job_params)
  • timezone: (String, nil) (defaults to: nil)
  • paused: (Boolean, nil) (defaults to: nil)

Returns:



605
606
607
608
609
610
# File 'lib/zizq/client.rb', line 605

def add_cron_group_entry(group, name:, expression:, job:, timezone: nil, paused: nil)
  body = build_cron_entry(name:, expression:, job:, timezone:, paused:)
  response = post("/crons/#{enc(group)}/entries", body)
  data = handle_response!(response, expected: 201)
  Resources::CronEntry.new(self, data)
end

#build_cron_entry(name: nil, expression: nil, job: nil, timezone: nil, paused: nil) ⇒ Hash[Symbol, untyped]

Validate and build a cron entry body from keyword arguments.

Parameters:

  • name: (String) (defaults to: nil)
  • expression: (String) (defaults to: nil)
  • job: (Zizq::cron_job_params) (defaults to: nil)
  • timezone: (String, nil) (defaults to: nil)
  • paused: (Boolean, nil) (defaults to: nil)

Returns:

  • (Hash[Symbol, untyped])


917
918
919
920
921
922
923
924
925
# File 'lib/zizq/client.rb', line 917

def build_cron_entry(name: nil, expression: nil, job: nil, timezone: nil, paused: nil)
  {
    name:,
    expression:,
    timezone:,
    paused:,
    job: build_cron_job(**job),
  }.compact
end

#build_cron_job(type: nil, queue: nil, payload: nil, priority: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Hash[Symbol, untyped]

Validate and build a cron job template from keyword arguments.

Uses keyword args so that unknown keys raise ArgumentError.

Parameters:

  • type: (String) (defaults to: nil)
  • queue: (String) (defaults to: nil)
  • payload: (Object) (defaults to: nil)
  • priority: (Integer, nil) (defaults to: nil)
  • retry_limit: (Integer, nil) (defaults to: nil)
  • backoff: (Zizq::backoff, nil) (defaults to: nil)
  • retention: (Zizq::retention, nil) (defaults to: nil)
  • unique_key: (String, nil) (defaults to: nil)
  • unique_while: (Zizq::unique_scope, nil) (defaults to: nil)

Returns:

  • (Hash[Symbol, untyped])


941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
# File 'lib/zizq/client.rb', line 941

def build_cron_job(type: nil,
                   queue: nil,
                   payload: nil,
                   priority: nil,
                   retry_limit: nil,
                   backoff: nil,
                   retention: nil,
                   unique_key: nil,
                   unique_while: nil)
  job = { type:, queue:, payload: } #: Hash[Symbol, untyped]
  job[:priority] = priority if priority
  job[:retry_limit] = retry_limit if retry_limit
  job[:backoff] = backoff if backoff
  job[:retention] = retention if retention
  job[:unique_key] = unique_key if unique_key
  job[:unique_while] = unique_while.to_s if unique_while
  job
end

#build_path(path, params: {}) ⇒ Object

Build a relative path with optional query parameters.

Parameters:

  • path (Object)
  • params: (Object) (defaults to: {})

Returns:

  • (Object)


902
903
904
905
906
907
# File 'lib/zizq/client.rb', line 902

def build_path(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> String
  unless params.empty?
    path = "#{path}?#{URI.encode_www_form(params)}"
  end
  path
end

#build_set_body(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]

Build the JSON body hash for a PATCH request from set parameters.

  • UNCHANGED values are omitted (field not sent).
  • RESET values are sent as nil (JSON null).
  • nil is rejected — use RESET to clear a field.
  • Other values are converted to their wire format.

Parameters:

  • queue: (Object) (defaults to: UNCHANGED)
  • priority: (Object) (defaults to: UNCHANGED)
  • ready_at: (Object) (defaults to: UNCHANGED)
  • retry_limit: (Object) (defaults to: UNCHANGED)
  • backoff: (Object) (defaults to: UNCHANGED)
  • retention: (Object) (defaults to: UNCHANGED)

Returns:

  • (Hash[Symbol, untyped])


1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
# File 'lib/zizq/client.rb', line 1054

def build_set_body(queue: UNCHANGED,
                   priority: UNCHANGED,
                   ready_at: UNCHANGED,
                   retry_limit: UNCHANGED,
                   backoff: UNCHANGED,
                   retention: UNCHANGED)
  body = {} #: Hash[Symbol, untyped]

  unless queue.equal?(UNCHANGED)
    raise ArgumentError, "queue cannot be nil; use Zizq::RESET to clear or Zizq::UNCHANGED to leave as-is" if queue.nil?
    body[:queue] = queue
  end

  unless priority.equal?(UNCHANGED)
    raise ArgumentError, "priority cannot be nil; use Zizq::RESET to clear or Zizq::UNCHANGED to leave as-is" if priority.nil?
    body[:priority] = priority
  end

  unless ready_at.equal?(UNCHANGED)
    body[:ready_at] = ready_at.equal?(RESET) ? nil : (ready_at.to_f * 1000).to_i
  end

  unless retry_limit.equal?(UNCHANGED)
    body[:retry_limit] = retry_limit.equal?(RESET) ? nil : retry_limit
  end

  unless backoff.equal?(UNCHANGED)
    body[:backoff] = if backoff.equal?(RESET)
      nil
    else
      {
        exponent: backoff[:exponent].to_f,
        base_ms: (backoff[:base].to_f * 1000).to_i,
        jitter_ms: (backoff[:jitter].to_f * 1000).to_i
      }
    end
  end

  unless retention.equal?(UNCHANGED)
    body[:retention] = if retention.equal?(RESET)
      nil
    else
      ret = {} #: Hash[Symbol, Integer]
      ret[:completed_ms] = (retention[:completed].to_f * 1000).to_i if retention[:completed]
      ret[:dead_ms] = (retention[:dead].to_f * 1000).to_i if retention[:dead]
      ret
    end
  end

  body
end

#build_where_params(options, multi_keys: []) ⇒ Object

Build query params for list endpoints, joining multi-value keys with ",".

Parameters:

  • options (Object)
  • multi_keys: (Object) (defaults to: [])

Returns:

  • (Object)


1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
# File 'lib/zizq/client.rb', line 1107

def build_where_params(options, multi_keys: []) #: (Hash[Symbol, untyped], ?multi_keys: Array[Symbol]) -> Hash[Symbol, untyped]
  params = {} #: Hash[Symbol, untyped]
  options.each do |key, value|
    if multi_keys.include?(key) && value.is_a?(Array)
      params[key] = value.join(",")
    else
      params[key] = value
    end
  end
  params
end

#cleanup_internal_clientsObject

: () -> void

Returns:

  • (Object)


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/zizq/client.rb', line 135

def cleanup_internal_clients #: () -> void
  @mutex.synchronize do
    @http_clients.each do |ref|
      ref.close
    rescue WeakRef::RefError
      # Client already GC'd (owning thread exited).
    rescue NoMethodError
      # The async connection pool may hold references to tasks whose
      # fibers were already reclaimed when their owning Sync reactor
      # exited. Stopping those dead tasks raises NoMethodError; safe
      # to ignore.
    end
    @http_clients.clear
  end
end

#closeObject

Close all thread-local HTTP clients and release connections.

Returns:

  • (Object)


124
125
126
127
128
129
130
131
132
133
# File 'lib/zizq/client.rb', line 124

def close #: () -> void
  if @io_thread&.alive?
    @mutex.synchronize do
      @io_queue&.close
      @io_thread&.join
    end
  end

  self.class.make_finalizer(@io_queue, @http_clients).call
end

#consume_response(response) ⇒ Object

Read the response body and close it, returning a RawResponse that is safe to use outside the reactor.

Parameters:

  • response (Object)

Returns:

  • (Object)


1155
1156
1157
1158
1159
# File 'lib/zizq/client.rb', line 1155

def consume_response(response) #: (untyped) -> RawResponse
  RawResponse.new(status: response.status, body: response.read, content_type: response.headers["content-type"])
ensure
  response.close
end

#count_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Integer

Count jobs matching the given filters.

Accepts the same filter arguments as list_jobs (minus pagination). Returns the count as an integer.

Parameters:

  • id: (String, Array[String], nil) (defaults to: nil)
  • status: (String, Array[String], nil) (defaults to: nil)
  • queue: (String, Array[String], nil) (defaults to: nil)
  • type: (String, Array[String], nil) (defaults to: nil)
  • filter: (String, nil) (defaults to: nil)
  • priority: (Integer, Range[Integer?], nil) (defaults to: nil)
  • ready_at: (Zizq::to_f, Range[Zizq::to_f?], nil) (defaults to: nil)
  • attempts: (Integer, Range[Integer?], nil) (defaults to: nil)

Returns:

  • (Integer)


308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/zizq/client.rb', line 308

def count_jobs(id: nil,
               status: nil,
               queue: nil,
               type: nil,
               filter: nil,
               priority: nil,
               ready_at: nil,
               attempts: nil)
  options = {
    id:,
    status:,
    queue:,
    type:,
    filter:,
    priority: encode_range(priority),
    ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i },
    attempts: encode_range(attempts),
  }.compact #: Hash[Symbol, untyped]

  multi_keys = %i[id status queue type]
  params = build_where_params(options, multi_keys:)

  # An empty filter ([] or "") matches nothing — short-circuit.
  multi_keys.each do |key|
    return 0 if params[key] == ""
  end

  response = get("/jobs/count", params:)
  data = handle_response!(response, expected: 200)
  data.fetch("count")
end

#decode_body(data, content_type: nil) ⇒ Object

: (String, ?content_type: String?) -> Hash[String, untyped]

Parameters:

  • data (Object)
  • content_type: (Object) (defaults to: nil)

Returns:

  • (Object)


1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
# File 'lib/zizq/client.rb', line 1127

def decode_body(data, content_type: nil) #: (String, ?content_type: String?) -> Hash[String, untyped]
  format = case content_type
           when /msgpack/ then :msgpack
           when /json/ then :json
           else @format
           end
  case format
  when :msgpack then MessagePack.unpack(data)
  when :json then JSON.parse(data)
  else raise ArgumentError, "Unknown format: #{format}"
  end
end

#delete(path, params: {}) ⇒ Object

: (String, ?params: Hash[Symbol, untyped]) -> RawResponse

Parameters:

  • path (Object)
  • params: (Object) (defaults to: {})

Returns:

  • (Object)


1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
# File 'lib/zizq/client.rb', line 1299

def delete(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> RawResponse
  request do |http|
    consume_response(
      http.delete(
        build_path(path, params:),
        {"accept" => @content_type}
      )
    )
  end
end

#delete_all_cronsInteger

Delete every cron group on the server in a single call.

Returns the number of cron groups removed.

Destructive. This deletes every cron group on the server. For granular deletes, use delete_cron_group with a specific name.

Requires a Pro license on the server.

Returns:

  • (Integer)


576
577
578
579
580
# File 'lib/zizq/client.rb', line 576

def delete_all_crons
  response = delete("/crons")
  data = handle_response!(response, expected: 200)
  data.fetch("deleted")
end

#delete_all_jobs(where: {}) ⇒ Integer

Delete jobs matching the given filters.

Filters in the where: argument use the same keys as list_jobs. An empty where: hash deletes all jobs.

Returns the number of deleted jobs.

Parameters:

  • where: (Zizq::where_params) (defaults to: {})

Returns:

  • (Integer)


359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/zizq/client.rb', line 359

def delete_all_jobs(where: {})
  filter_params = validate_where(**where)

  multi_keys = %i[id status queue type]
  params = build_where_params(filter_params, multi_keys:)

  # An empty multi-value filter matches nothing — short-circuit.
  multi_keys.each do |key|
    return 0 if params[key] == ""
  end

  response = delete("/jobs", params:)
  data = handle_response!(response, expected: 200)
  data.fetch("deleted")
end

#delete_cron_group(name) ⇒ void

This method returns an undefined value.

Delete a cron group and all its entries.

Parameters:

  • name (String)


559
560
561
562
563
# File 'lib/zizq/client.rb', line 559

def delete_cron_group(name)
  response = delete("/crons/#{enc(name)}")
  handle_response!(response, expected: 204)
  nil
end

#delete_cron_group_entry(group, entry) ⇒ void

This method returns an undefined value.

Delete a single cron entry.

Parameters:

  • group (String)
  • entry (String)


647
648
649
650
651
# File 'lib/zizq/client.rb', line 647

def delete_cron_group_entry(group, entry)
  response = delete("/crons/#{enc(group)}/entries/#{enc(entry)}")
  handle_response!(response, expected: 204)
  nil
end

#delete_job(id) ⇒ void

This method returns an undefined value.

Delete a single job by ID.

Parameters:

  • id (String)


344
345
346
347
348
# File 'lib/zizq/client.rb', line 344

def delete_job(id)
  response = delete("/jobs/#{enc(id)}")
  handle_response!(response, expected: [200, 204])
  nil
end

#enc(value) ⇒ Object

URL-encode a single path segment.

Parameters:

  • value (Object)

Returns:

  • (Object)


897
898
899
# File 'lib/zizq/client.rb', line 897

def enc(value) #: (String) -> String
  URI.encode_uri_component(value)
end

#encode_body(body) ⇒ Object

: (Hash[Symbol, untyped]) -> String

Parameters:

  • body (Object)

Returns:

  • (Object)


1119
1120
1121
1122
1123
1124
1125
# File 'lib/zizq/client.rb', line 1119

def encode_body(body) #: (Hash[Symbol, untyped]) -> String
  case @format
  when :msgpack then MessagePack.pack(body)
  when :json then JSON.generate(body)
  else raise ArgumentError, "Unknown format: #{@format}"
  end
end

#encode_range(value, &block) ⇒ void

This method returns an undefined value.

Encode an Integer or Range filter into the server's query format.

Accepted shapes:

  • nil -> nil (no filter sent)
  • Integer -> "N" (single value)
  • (a..b) -> "A..B" (inclusive both ends)
  • (a..) -> "A.." (lower bound only)
  • (..b) -> "..B" (upper bound only)

Exclusive ranges (a...b, a..., ...b) raise ArgumentError — the server only supports inclusive bounds. A bound transformation block can convert values before formatting (e.g. seconds -> ms for ready_at).

Parameters:

  • value (Object)


1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
# File 'lib/zizq/client.rb', line 1010

def encode_range(value, &block)
  block ||= ->(v) { v.to_i }

  case value
  when nil
    nil
  when Range
    if value.exclude_end?
      raise ArgumentError,
        "exclusive ranges are not supported by the server; use an inclusive range (a..b) instead"
    end
    "#{value.begin&.then(&block)}..#{value.end&.then(&block)}"
  else
    block.call(value).to_s
  end
end

#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Resources::Job

Enqueue a new job.

This is a low-level primitive that makes a direct API call to the server using the Zizq API's expected inputs. Callers should generally use [Zizq::enqueue] instead.

Returns a resource instance of the new job wrapping the API response.

Parameters:

  • queue: (String)
  • type: (String)
  • payload: (Hash[String | Symbol, untyped])
  • priority: (Integer, nil) (defaults to: nil)
  • ready_at: (Zizq::to_f, nil) (defaults to: nil)
  • retry_limit: (Integer, nil) (defaults to: nil)
  • backoff: (Zizq::backoff, nil) (defaults to: nil)
  • retention: (Zizq::retention, nil) (defaults to: nil)
  • unique_key: (String, nil) (defaults to: nil)
  • unique_while: (Zizq::unique_scope, nil) (defaults to: nil)

Returns:



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/zizq/client.rb', line 170

def enqueue(queue:,
            type:,
            payload:,
            priority: nil,
            ready_at: nil,
            retry_limit: nil,
            backoff: nil,
            retention: nil,
            unique_key: nil,
            unique_while: nil)
  body = { queue:, type:, payload: } #: Hash[Symbol, untyped]
  body[:priority] = priority if priority
  # ready_at is fractional seconds in Ruby; the server expects ms.
  body[:ready_at] = (ready_at.to_f * 1000).to_i if ready_at
  body[:retry_limit] = retry_limit if retry_limit
  body[:backoff] = backoff if backoff
  body[:retention] = retention if retention
  body[:unique_key] = unique_key if unique_key
  body[:unique_while] = unique_while.to_s if unique_while

  response = post("/jobs", body)
  data = handle_response!(response, expected: [200, 201])
  Resources::Job.new(self, data)
end

#enqueue_bulk(jobs:) ⇒ Array[Resources::Job]

Enqueue multiple jobs atomically in a single bulk request.

This is a low-level primitive that makes a direct API call to the server using the Zizq API's expected inputs. Callers should generally use [Zizq::enqueue_bulk] instead.

Returns an array of resource instances wrapping the API response.

Parameters:

  • jobs: (Array[Hash[Symbol, untyped]])

Returns:



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/zizq/client.rb', line 205

def enqueue_bulk(jobs:)
  body = {
    jobs: jobs.map do |job|
      wire = { type: job[:type], queue: job[:queue], payload: job[:payload] } #: Hash[Symbol, untyped]
      wire[:priority] = job[:priority] if job[:priority]
      # ready_at is fractional seconds in Ruby; the server expects ms.
      wire[:ready_at] = (job[:ready_at].to_f * 1000).to_i if job[:ready_at]
      wire[:retry_limit] = job[:retry_limit] if job[:retry_limit]
      wire[:backoff] = job[:backoff] if job[:backoff]
      wire[:retention] = job[:retention] if job[:retention]
      wire[:unique_key] = job[:unique_key] if job[:unique_key]
      wire[:unique_while] = job[:unique_while].to_s if job[:unique_while]
      wire
    end
  }

  response = post("/jobs/bulk", body)
  data = handle_response!(response, expected: [200, 201])
  data["jobs"].map { |j| Resources::Job.new(self, j) }
end

#ensure_io_threadObject

Lazily start the background IO thread (double-checked locking).

Returns:

  • (Object)


1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
# File 'lib/zizq/client.rb', line 1180

def ensure_io_thread #: () -> void
  return if @io_thread&.alive?

  @mutex.synchronize do
    return if @io_thread&.alive?

    @io_queue = Thread::Queue.new
    @io_thread = Thread.new { io_thread_run }
    @io_thread.name = "zizq-io"
  end
end

#erase_all_datavoid

This method returns an undefined value.

Wipe every cron group and every job on the server.

Equivalent to calling delete_all_crons followed by delete_all_jobs (no filter), but in a single request. Useful primarily as a setup/teardown step in tests where you want a known-empty server between scenarios.

Destructive. No filters, no escape hatch, no confirmation — the server-side operation simply returns once everything is gone.

Named erase_all_data rather than reset because Zizq.reset! already exists at the module level for client-side SDK state.



389
390
391
392
393
# File 'lib/zizq/client.rb', line 389

def erase_all_data
  response = raw_post("/reset")
  handle_response!(response, expected: 204)
  nil
end

#get(path, params: {}) ⇒ Object

: (String, ?params: Hash[Symbol, untyped]) -> RawResponse

Parameters:

  • path (Object)
  • params: (Object) (defaults to: {})

Returns:

  • (Object)


1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
# File 'lib/zizq/client.rb', line 1253

def get(path, params: {}) #: (String, ?params: Hash[Symbol, untyped]) -> RawResponse
  request do |http|
    consume_response(
      http.get(
        build_path(path, params:),
        {"accept" => @content_type}
      )
    )
  end
end

#get_cron_group(name) ⇒ Resources::CronGroup

Fetch a cron group and all its entries.

Parameters:

  • name (String)

Returns:



519
520
521
522
523
# File 'lib/zizq/client.rb', line 519

def get_cron_group(name)
  response = get("/crons/#{enc(name)}")
  data = handle_response!(response, expected: 200)
  Resources::CronGroup.new(self, data)
end

#get_cron_group_entry(group, entry) ⇒ Resources::CronEntry

Fetch a single cron entry.

Parameters:

  • group (String)
  • entry (String)

Returns:



587
588
589
590
591
# File 'lib/zizq/client.rb', line 587

def get_cron_group_entry(group, entry)
  response = get("/crons/#{enc(group)}/entries/#{enc(entry)}")
  data = handle_response!(response, expected: 200)
  Resources::CronEntry.new(self, data)
end

#get_error(id, attempt:) ⇒ Resources::ErrorRecord

Get a single error record by job ID and attempt number.

Parameters:

  • id (String)
  • attempt: (Integer)

Returns:



466
467
468
469
470
# File 'lib/zizq/client.rb', line 466

def get_error(id, attempt:)
  response = get("/jobs/#{enc(id)}/errors/#{enc(attempt.to_s)}")
  data = handle_response!(response, expected: 200)
  Resources::ErrorRecord.new(self, data)
end

#get_job(id) ⇒ Object

Get a single job by ID.

Parameters:

  • id (Object)

Returns:

  • (Object)


227
228
229
230
231
# File 'lib/zizq/client.rb', line 227

def get_job(id) #: (String) -> Resources::Job
  response = get("/jobs/#{enc(id)}")
  data = handle_response!(response, expected: 200)
  Resources::Job.new(self, data)
end

#get_path(path) ⇒ Object

GET a path on the server and return the decoded response body.

The path should include any query parameters already (e.g. pagination links from the server's pages object). This is intentionally public so that resource objects like Page can follow links without resorting to .send.

Parameters:

  • path (Object)

Returns:

  • (Object)


889
890
891
892
# File 'lib/zizq/client.rb', line 889

def get_path(path) #: (String) -> Hash[String, untyped]
  response = request { |http| consume_response(http.get(path, {"accept" => @content_type})) }
  handle_response!(response, expected: 200)
end

#get_queuesObject

List all distinct queue names on the server.

Returns:

  • (Object)


500
501
502
503
504
# File 'lib/zizq/client.rb', line 500

def get_queues #: () -> Array[String]
  response = get("/queues")
  data = handle_response!(response, expected: 200)
  data["queues"]
end

#handle_response!(response, expected:) ⇒ Object

Check response status and decode body, raising on errors.

Parameters:

  • response (Object)
  • expected: (Object)

Returns:

  • (Object)


1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
# File 'lib/zizq/client.rb', line 1323

def handle_response!(response, expected:) #: (RawResponse, expected: Integer | Array[Integer]) -> Hash[String, untyped]?
  status = response.status
  expected_statuses = Array(expected)

  ct = response.content_type

  if expected_statuses.include?(status)
    return nil if status == 204
    decode_body(response.body, content_type: ct)
  else
    body = begin
      decode_body(response.body, content_type: ct)
    rescue
      nil
    end
    message = body&.fetch("error", nil) || "HTTP #{status}"
    error_class = case status
                  when 404 then NotFoundError
                  when 400..499 then ClientError
                  when 500..599 then ServerError
                  else ResponseError
                  end
    raise error_class.new(message, status: status, body: body)
  end
end

#healthObject

Health check.

Returns:

  • (Object)


487
488
489
490
# File 'lib/zizq/client.rb', line 487

def health #: () -> Hash[String, untyped]
  response = get("/health")
  handle_response!(response, expected: 200)
end

#httpObject

Return the calling thread's HTTP client, creating one if needed. Uses thread_variable_get/set (not Thread.current) because the latter is fiber-local — each Async fiber would get its own client. The tracking array holds WeakRefs so clients from exited threads can be garbage-collected.

Returns:

  • (Object)


1228
1229
1230
# File 'lib/zizq/client.rb', line 1228

def http #: () -> Async::HTTP::Client
  thread_local_http(@http_key, @endpoint)
end

#io_thread_runObject

Main loop for the background IO thread. Mirrors AckProcessor: runs an Async reactor, pops work from the queue (fiber-scheduler-aware), and dispatches each call as a concurrent fiber via a barrier.

Returns:

  • (Object)


1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
# File 'lib/zizq/client.rb', line 1195

def io_thread_run #: () -> void
  ObjectSpace.define_finalizer(
    self,
    self.class.make_finalizer(@io_queue, @http_clients)
  )

  Sync do
    barrier = Async::Barrier.new

    while (item = @io_queue.pop)
      block, result_queue = item
      barrier.async do
        result_queue.push([:ok, block.call(http)])
      rescue Exception => e # rubocop:disable Lint/RescueException
        # Must catch Exception (not just StandardError) to ensure the
        # caller is always unblocked. Without this, errors like
        # NoMemoryError or library-level Exceptions would kill the IO
        # thread and leave callers blocking on result_queue.pop forever.
        result_queue.push([:error, e])
      end
    end

    barrier.wait
  end
ensure
  ObjectSpace.undefine_finalizer(self)
end

#list_cron_groupsArray[String]

List all cron group names.

Returns:

  • (Array[String])


509
510
511
512
513
# File 'lib/zizq/client.rb', line 509

def list_cron_groups
  response = get("/crons")
  data = handle_response!(response, expected: 200)
  data["crons"]
end

#list_errors(id, from: nil, order: nil, limit: nil) ⇒ Resources::ErrorPage

List error records for a job.

Parameters:

  • id (String)
  • from: (String, nil) (defaults to: nil)
  • order: (Zizq::sort_direction, nil) (defaults to: nil)
  • limit: (Integer, nil) (defaults to: nil)

Returns:



479
480
481
482
483
484
# File 'lib/zizq/client.rb', line 479

def list_errors(id, from: nil, order: nil, limit: nil)
  params = { from:, order:, limit: }.compact #: Hash[Symbol, untyped]
  response = get("/jobs/#{enc(id)}/errors", params:)
  data = handle_response!(response, expected: 200)
  Resources::ErrorPage.new(self, data)
end

#list_jobs(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil, from: nil, order: nil, limit: nil) ⇒ Resources::JobPage

List jobs with optional filters.

Multi-value filters (status, queue, type, id) accept arrays — they are joined with commas as the server expects.

The filter parameter accepts a jq expression for filtering jobs by payload content (e.g. .user_id == 42).

Range filters (priority, attempts, ready_at) accept exact values or inclusive ranges.

Parameters:

  • id: (String, Array[String], nil) (defaults to: nil)
  • status: (String, Array[String], nil) (defaults to: nil)
  • queue: (String, Array[String], nil) (defaults to: nil)
  • type: (String, Array[String], nil) (defaults to: nil)
  • filter: (String, nil) (defaults to: nil)
  • priority: (Integer, Range[Integer?], nil) (defaults to: nil)
  • ready_at: (Zizq::to_f, Range[Zizq::to_f?], nil) (defaults to: nil)
  • attempts: (Integer, Range[Integer?], nil) (defaults to: nil)
  • from: (String, nil) (defaults to: nil)
  • order: (Zizq::sort_direction, nil) (defaults to: nil)
  • limit: (Integer, nil) (defaults to: nil)

Returns:



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/zizq/client.rb', line 256

def list_jobs(id: nil,
              status: nil,
              queue: nil,
              type: nil,
              filter: nil,
              priority: nil,
              ready_at: nil,
              attempts: nil,
              from: nil,
              order: nil,
              limit: nil)
  options = {
    id:,
    status:,
    queue:,
    type:,
    filter:,
    priority: encode_range(priority),
    ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i },
    attempts: encode_range(attempts),
    from:,
    order:,
    limit:,
  }.compact #: Hash[Symbol, untyped]

  multi_keys = %i[id status queue type]
  params = build_where_params(options, multi_keys:)

  # An empty filter ([] or "") matches nothing — short-circuit.
  multi_keys.each do |key|
    return Resources::JobPage.new(self, { "jobs" => [], "pages" => {} }) if params[key] == ""
  end

  response = get("/jobs", params:)
  data = handle_response!(response, expected: 200)
  Resources::JobPage.new(self, data)
end

#patch(path, body, params: {}) ⇒ Object

: (String, Hash[Symbol, untyped], ?params: Hash[Symbol, untyped]) -> RawResponse

Parameters:

  • path (Object)
  • body (Object)
  • params: (Object) (defaults to: {})

Returns:

  • (Object)


1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
# File 'lib/zizq/client.rb', line 1310

def patch(path, body, params: {}) #: (String, Hash[Symbol, untyped], ?params: Hash[Symbol, untyped]) -> RawResponse
  request do |http|
    consume_response(
      http.patch(
        build_path(path, params:),
        {"content-type" => @content_type, "accept" => @content_type},
        Protocol::HTTP::Body::Buffered.wrap(encode_body(body))
      )
    )
  end
end

#post(path, body) ⇒ Object

: (String, Hash[Symbol, untyped]) -> RawResponse

Parameters:

  • path (Object)
  • body (Object)

Returns:

  • (Object)


1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
# File 'lib/zizq/client.rb', line 1264

def post(path, body) #: (String, Hash[Symbol, untyped]) -> RawResponse
  request do |http|
    consume_response(
      http.post(
        build_path(path),
        {"content-type" => @content_type, "accept" => @content_type},
        Protocol::HTTP::Body::Buffered.wrap(encode_body(body))
      )
    )
  end
end

#put(path, body) ⇒ Object

: (String, Hash[Symbol, untyped]) -> RawResponse

Parameters:

  • path (Object)
  • body (Object)

Returns:

  • (Object)


1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
# File 'lib/zizq/client.rb', line 1287

def put(path, body) #: (String, Hash[Symbol, untyped]) -> RawResponse
  request do |http|
    consume_response(
      http.put(
        build_path(path),
        {"content-type" => @content_type, "accept" => @content_type},
        Protocol::HTTP::Body::Buffered.wrap(encode_body(body))
      )
    )
  end
end

#raw_post(path) ⇒ Object

: (String) -> RawResponse

Parameters:

  • path (Object)

Returns:

  • (Object)


1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
# File 'lib/zizq/client.rb', line 1276

def raw_post(path) #: (String) -> RawResponse
  request do |http|
    consume_response(
      http.post(
        build_path(path),
        {"accept" => @content_type}
      )
    )
  end
end

#replace_cron_group(name, paused: nil, entries: []) ⇒ Resources::CronGroup

Create or replace an entire cron group.

Entries not present in the request are removed. Entries with unchanged expressions preserve their scheduling state.

Parameters:

  • name (String)
  • paused: (Boolean, nil) (defaults to: nil)
  • entries: (Array[Zizq::cron_entry_params]) (defaults to: [])

Returns:



534
535
536
537
538
539
540
541
542
# File 'lib/zizq/client.rb', line 534

def replace_cron_group(name, paused: nil, entries: [])
  body = {
    paused:,
    entries: entries.map { |entry| build_cron_entry(**entry) }
  }.compact
  response = put("/crons/#{enc(name)}", body)
  data = handle_response!(response, expected: 200)
  Resources::CronGroup.new(self, data)
end

#replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil) ⇒ Resources::CronEntry

Create or replace a single cron entry.

Preserves scheduling state if the expression is unchanged.

Parameters:

  • group (String)
  • entry (String)
  • expression: (String)
  • job: (Zizq::cron_job_params)
  • timezone: (String, nil) (defaults to: nil)
  • paused: (Boolean, nil) (defaults to: nil)

Returns:



623
624
625
626
627
628
# File 'lib/zizq/client.rb', line 623

def replace_cron_group_entry(group, entry, expression:, job:, timezone: nil, paused: nil)
  body = build_cron_entry(name: entry, expression:, job:, timezone:, paused:)
  response = put("/crons/#{enc(group)}/entries/#{enc(entry)}", body)
  data = handle_response!(response, expected: 200)
  Resources::CronEntry.new(self, data)
end

#report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false) ⇒ Resources::Job Also known as: nack

Report a job failure (nack).

Returns the updated job metadata.

If this method is not called when errors occur processing jobs, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server's global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.

Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.

The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.

Parameters:

  • id (String)
  • message: (String)
  • error_type: (String, nil) (defaults to: nil)
  • backtrace: (String, nil) (defaults to: nil)
  • retry_at: (Float, nil) (defaults to: nil)
  • kill: (Boolean) (defaults to: false)

Returns:



726
727
728
729
730
731
732
733
734
735
736
737
# File 'lib/zizq/client.rb', line 726

def report_failure(id, message:, error_type: nil, backtrace: nil, retry_at: nil, kill: false)
  body = { message: } #: Hash[Symbol, untyped]
  body[:error_type] = error_type if error_type
  body[:backtrace] = backtrace if backtrace
  # retry_at is fractional seconds in Ruby; the server expects ms.
  body[:retry_at] = (retry_at * 1000).to_i if retry_at
  body[:kill] = kill if kill

  response = post("/jobs/#{enc(id)}/failure", body)
  data = handle_response!(response, expected: 200)
  Resources::Job.new(self, data)
end

#report_success(id) ⇒ Object Also known as: ack

Mark a job as successfully completed (ack).

If this method (or [#report_failure]) is not called upon job completion, the Zizq server will consider it in-flight and will not send any more jobs if the prefetch limit has been reached, or the server's global in-flight limit has been reached. Jobs must be either acknowledged or failed before new jobs are sent.

Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.

The Zizq server sends heartbeat messages to connected workers so that it can quickly detect and handle disconnected clients.

Parameters:

  • id (Object)

Returns:

  • (Object)


669
670
671
672
673
# File 'lib/zizq/client.rb', line 669

def report_success(id) #: (String) -> nil
  response = raw_post("/jobs/#{enc(id)}/success")
  handle_response!(response, expected: 204)
  nil
end

#report_success_bulk(ids) ⇒ nil Also known as: ack_bulk

Bulk-mark jobs as successfully completed (bulk ack).

See [#report_success] for full details of how acknowledgemen works.

There are two ways in which the server can respond successfully:

  1. 204 - No Content (All jobs acknowledged)
  2. 422 - Unprocessible Entity (Some jobs were not found)

Both of these statuses are in reality treated as success because missing jobs have either been previously acknowledged and purged, or moved to some other status that cannot be acknowledged.

Other error response types will still raise.

Parameters:

  • ids (Array[String])

Returns:

  • (nil)


692
693
694
695
696
# File 'lib/zizq/client.rb', line 692

def report_success_bulk(ids)
  response = post("/jobs/success", { ids: ids })
  return nil if response.status == 422
  handle_response!(response, expected: 204)
end

#request(&block) ⇒ void

This method returns an undefined value.

Dispatch a block to the appropriate execution context.

If already inside an Async reactor (e.g. AckProcessor, producer), yields the calling thread's HTTP client directly. Otherwise, dispatches via the persistent background IO thread.



1145
1146
1147
1148
1149
1150
1151
# File 'lib/zizq/client.rb', line 1145

def request(&block) #: () { (Async::HTTP::Client) -> RawResponse } -> RawResponse
  if Async::Task.current?
    yield http
  else
    sync_call(&block)
  end
end

#server_versionObject

Server version string.

Returns:

  • (Object)


493
494
495
496
497
# File 'lib/zizq/client.rb', line 493

def server_version #: () -> String
  response = get("/version")
  data = handle_response!(response, expected: 200)
  data["version"]
end

#stream_httpObject

Return the calling thread's streaming HTTP client (HTTP/1.1), creating one if needed. See #http for the thread-local locking rationale. Kept separate from the main client so the long-lived /jobs/take connection doesn't share an HTTP/2 session with ack/enqueue traffic.

Returns:

  • (Object)


1237
1238
1239
# File 'lib/zizq/client.rb', line 1237

def stream_http #: () -> Async::HTTP::Client
  thread_local_http(@stream_http_key, @stream_endpoint)
end

#sync_call(&block) ⇒ void

This method returns an undefined value.

Push a work block to the background IO thread and block until it completes, returning the result or re-raising any exception.



1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
# File 'lib/zizq/client.rb', line 1163

def sync_call(&block) #: () { (Async::HTTP::Client) -> RawResponse } -> RawResponse
  ensure_io_thread

  result_queue = Thread::Queue.new
  @io_queue.push([block, result_queue])

  tag, value = result_queue.pop
  if tag == :ok
    value
  else
    raise value
  end
rescue ClosedQueueError
  raise ConnectionError, "client is closed"
end

#take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil) {|arg0| ... } ⇒ void

This method returns an undefined value.

Stream jobs from the server. Yields parsed job hashes.

This method does not return unless the server closes the connection or the connection is otherwise interrupted. Jobs are continuously streamed to the client, and when no jobs are available the client waits for new jobs to become ready.

If the client does not acknowledge or fail jobs with [#report_success] or [#report_failure] the server will stop sending new jobs to the client as it hits its prefetch limit.

Jobs are durable and "at least once" delivery is guaranteed. If the client disconnects before it is able to report success or failure the server automatically moves the job back to the queue where it will be provided to another worker. Clients should be prepared to see the same job more than once for this reason.

The Zizq server sends periodic heartbeat messages to the client which are silently consumed.

Example:

client.take_jobs(prefetch: 5) do |job|
 puts "Got job: #{job.inspect}"
 client.ack(job.id) # mark the job completed
end

Parameters:

  • prefetch: (Integer) (defaults to: 1)
  • queues: (Array[String]) (defaults to: [])
  • worker_id: (String, nil) (defaults to: nil)
  • on_connect: (Object) (defaults to: nil)
  • on_response: (Object) (defaults to: nil)

Yields:

Yield Parameters:

Yield Returns:

  • (void)


775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
# File 'lib/zizq/client.rb', line 775

def take_jobs(prefetch: 1, queues: [], worker_id: nil, on_connect: nil, on_response: nil, &block)
  raise ArgumentError, "take_jobs requires a block" unless block

  params = { prefetch: } #: Hash[Symbol, untyped]
  params[:queue] = queues.join(",") unless queues.empty?

  path = build_path("/jobs/take", params:)
  headers = { "accept" => @stream_accept }
  headers["worker-id"] = worker_id if worker_id

  Sync do
    response = stream_http.get(path, headers)

    begin
      raise StreamError, "take jobs stream returned HTTP #{response.status}" unless response.status == 200
      on_connect&.call
      on_response&.call(response)

      # Wrap each parsed hash in a Resources::Job before yielding.
      wrapper = proc { |data| block.call(Resources::Job.new(self, data)) }

      # async-http returns `nil` for empty response bodies over HTTP/1.1
      # (e.g. a 200 with content-length: 0 from the server closing the
      # stream immediately). Treat that as "no chunks" rather than
      # crashing in the parser.
      body = response.body || []

      case @format
      when :json then self.class.parse_ndjson(body, &wrapper)
      when :msgpack then self.class.parse_msgpack_stream(body, &wrapper)
      end
    ensure
      response.close rescue nil
    end
  end
rescue SocketError,
       IOError,
       EOFError,
       Errno::ECONNRESET,
       Errno::EPIPE,
       IO::TimeoutError,
       OpenSSL::SSL::SSLError => e
  raise ConnectionError, e.message
end

#thread_local_http(key, endpoint) ⇒ Object

: (Symbol, Async::HTTP::Endpoint) -> Async::HTTP::Client

Parameters:

  • key (Object)
  • endpoint (Object)

Returns:

  • (Object)


1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
# File 'lib/zizq/client.rb', line 1241

def thread_local_http(key, endpoint) #: (Symbol, Async::HTTP::Endpoint) -> Async::HTTP::Client
  Thread.current.thread_variable_get(key) || begin
    client = Async::HTTP::Client.new(endpoint)
    @mutex.synchronize do
      @http_clients.reject! { |ref| !ref.weakref_alive? }
      @http_clients << WeakRef.new(client)
    end
    Thread.current.thread_variable_set(key, client)
    client
  end
end

#update_all_jobs(where: {}, apply: {}) ⇒ Integer

Update all jobs matching the given filters.

Filters in the where: argument use the same keys as list_jobs. Fields in the apply: argument use the same keys as update_job.

Terminal jobs (completed/dead) are silently skipped unless explicitly requested via status: in where:, which returns 422.

Returns the number of updated jobs.

Parameters:

  • where: (Zizq::where_params) (defaults to: {})
  • apply: (Zizq::apply_params) (defaults to: {})

Returns:

  • (Integer)


444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/zizq/client.rb', line 444

def update_all_jobs(where: {}, apply: {})
  filter_params = validate_where(**where)

  multi_keys = %i[id status queue type]
  params = build_where_params(filter_params, multi_keys:)

  # An empty multi-value filter matches nothing — short-circuit.
  multi_keys.each do |key|
    return 0 if params[key] == ""
  end

  body = validate_and_build_set(**apply)
  response = patch("/jobs", body, params:)
  data = handle_response!(response, expected: 200)
  data.fetch("patched")
end

#update_cron_group(name, paused: nil) ⇒ Resources::CronGroup

Update group-level fields (currently just pause/unpause).

Parameters:

  • name (String)
  • paused: (Boolean, nil) (defaults to: nil)

Returns:



549
550
551
552
553
# File 'lib/zizq/client.rb', line 549

def update_cron_group(name, paused: nil)
  response = patch("/crons/#{enc(name)}", { paused: }.compact)
  data = handle_response!(response, expected: 200)
  Resources::CronGroup.new(self, data)
end

#update_cron_group_entry(group, entry, paused:) ⇒ Resources::CronEntry

Update entry-level fields (currently just pause/unpause).

Parameters:

  • group (String)
  • entry (String)
  • paused: (Boolean)

Returns:



636
637
638
639
640
# File 'lib/zizq/client.rb', line 636

def update_cron_group_entry(group, entry, paused:)
  response = patch("/crons/#{enc(group)}/entries/#{enc(entry)}", { paused: })
  data = handle_response!(response, expected: 200)
  Resources::CronEntry.new(self, data)
end

#update_job(id, queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Resources::Job

Update a single job's mutable fields.

Fields not provided are left unchanged. Use Zizq::RESET to clear a nullable field back to the server default.

Raises Zizq::NotFoundError if the job does not exist. Raises Zizq::ClientError (422) if the job is in a terminal state.

Parameters:

Returns:



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/zizq/client.rb', line 411

def update_job(id,
               queue: UNCHANGED,
               priority: UNCHANGED,
               ready_at: UNCHANGED,
               retry_limit: UNCHANGED,
               backoff: UNCHANGED,
               retention: UNCHANGED)
  body = build_set_body(
    queue:,
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:
  )
  response = patch("/jobs/#{enc(id)}", body)
  data = handle_response!(response, expected: 200)
  Resources::Job.new(self, data)
end

#validate_and_build_set(queue: UNCHANGED, priority: UNCHANGED, ready_at: UNCHANGED, retry_limit: UNCHANGED, backoff: UNCHANGED, retention: UNCHANGED) ⇒ Hash[Symbol, untyped]

Validate set parameters via keyword args (rejects unknown keys) and build the JSON body. Used by update_all_jobs.

Parameters:

Returns:

  • (Hash[Symbol, untyped])


1037
1038
1039
1040
1041
1042
1043
1044
# File 'lib/zizq/client.rb', line 1037

def validate_and_build_set(queue: UNCHANGED,
                           priority: UNCHANGED,
                           ready_at: UNCHANGED,
                           retry_limit: UNCHANGED,
                           backoff: UNCHANGED,
                           retention: UNCHANGED)
  build_set_body(queue:, priority:, ready_at:, retry_limit:, backoff:, retention:)
end

#validate_where(id: nil, status: nil, queue: nil, type: nil, filter: nil, priority: nil, ready_at: nil, attempts: nil) ⇒ Hash[Symbol, untyped]

Validate and normalize filter parameters for bulk operations.

Uses keyword arguments so that unknown keys raise ArgumentError.

Parameters:

  • id: (String, Array[String], nil) (defaults to: nil)
  • status: (String, Array[String], nil) (defaults to: nil)
  • queue: (String, Array[String], nil) (defaults to: nil)
  • type: (String, Array[String], nil) (defaults to: nil)
  • filter: (String, nil) (defaults to: nil)
  • priority: (Integer, Range[Integer?], nil) (defaults to: nil)
  • ready_at: (Zizq::to_f, Range[Zizq::to_f?], nil) (defaults to: nil)
  • attempts: (Integer, Range[Integer?], nil) (defaults to: nil)

Returns:

  • (Hash[Symbol, untyped])


973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
# File 'lib/zizq/client.rb', line 973

def validate_where(id: nil,
                   status: nil,
                   queue: nil,
                   type: nil,
                   filter: nil,
                   priority: nil,
                   ready_at: nil,
                   attempts: nil)
  {
    id:,
    status:,
    queue:,
    type:,
    filter:,
    priority: encode_range(priority),
    ready_at: encode_range(ready_at) { |v| (v.to_f * 1000).to_i },
    attempts: encode_range(attempts),
  }.compact
end