Class: Zizq::Test::Client
Overview
A ‘Zizq::Client` stand-in for use in test suites.
Buffers ‘enqueue` / `enqueue_bulk` calls in memory and returns synthetic `Resources::Job` instances (with generated ids) so that callers depending on the regular client’s return contract don’t need to special-case test mode.
Read operations (‘get_queues`, `list_jobs`, `count_jobs`, …) are explicitly not supported in test mode and raise `NotSupported`. Tests that need those should either run against a real server or stub at a higher level.
Activated indirectly via ‘Zizq.configuration.test_mode = true` —`Zizq.client` then lazily builds a `Test::Client` instead of a real `Client`.
Defined Under Namespace
Classes: Entry, NotSupported
Constant Summary collapse
- ID_LENGTH =
Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (‘test` prefix + zero padded counter) so they fit anywhere a real id would.
25- ID_PREFIX =
"test"- STATUS_SCHEDULED =
The canonical Zizq lifecycle states. We mirror these so the ‘status` on a buffered job reflects what the real server would report. Test mode never retries — `in_flight` only ever transitions to `completed` or `dead`.
"scheduled"- STATUS_READY =
"ready"- STATUS_IN_FLIGHT =
"in_flight"- STATUS_COMPLETED =
"completed"- STATUS_DEAD =
"dead"- PASS_ALL_FILTER =
Default ‘filter:` lambda — passes every job. Named so the filter pipeline is always callable without nil-checking.
->(_job) { true }
Constants inherited from Client
Client::CONTENT_TYPES, Client::STREAM_ACCEPT
Instance Attribute Summary
Attributes inherited from Client
Class Method Summary collapse
-
.normalize_payload(payload) ⇒ Object
Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.
Instance Method Summary collapse
-
#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object
Returns entries matching every named filter AND the predicate.
-
#clear! ⇒ Object
Reset the buffer.
-
#close ⇒ Object
: () -> void.
-
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#dispatch_entry(entry) ⇒ Object
Mark the entry in_flight, dispatch through the full dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), settle to completed or dead.
-
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters.
- #enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
- #enqueue_bulk(jobs:) ⇒ Object
-
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered.
-
#enqueued_requests(**filters) ⇒ Object
Original ‘EnqueueRequest`s in submission order.
- #filter_by_status(statuses, **filters) ⇒ Object
-
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#initialize ⇒ Client
constructor
: () -> void.
-
#normalize_filter(value) ⇒ Object
Accept a Class, String, or Array of those; emit an Array of Strings.
-
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch — ‘ready` and `scheduled` entries.
-
#runnable?(entry, now_ms) ⇒ Boolean
: (Entry, Integer) -> bool.
-
#take_runnable_snapshot(**filters) ⇒ Object
Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot.
Methods inherited from Client
#add_cron_group_entry, #cleanup_internal_clients, #count_jobs, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #health, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #server_version, #take_jobs, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job
Constructor Details
#initialize ⇒ Client
: () -> void
58 59 60 61 62 63 64 65 |
# File 'lib/zizq/test/client.rb', line 58 def initialize #: () -> void # Skip the parent's HTTP setup — we don't open connections in # test mode. The parent's @http and friends stay nil; methods # that would touch them are overridden below. @entries = [] #: Array[Entry] @mutex = Mutex.new end |
Class Method Details
.normalize_payload(payload) ⇒ Object
Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.
Also used by ‘Zizq::Test.enqueued_raw?` / `enqueued_raw_count` to normalize the query side so symbol-keyed assertion payloads still match the (now string-keyed) buffer.
253 254 255 |
# File 'lib/zizq/test/client.rb', line 253 def self.normalize_payload(payload) #: (untyped) -> untyped JSON.parse(JSON.generate(payload)) end |
Instance Method Details
#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object
Returns entries matching every named filter AND the predicate. All filter kwargs are optional; unset means “don’t filter on this axis.” Callers must hold ‘@mutex` — public accessors do so via `synchronize` before calling.
-
‘only_queues:` / `except_queues:` — String, Array of Strings.
-
‘only_types:` / `except_types:` — String, Class, or Array of those. Class names are matched against the wire-format `type` string via `.to_s`.
-
‘filter:` — a lambda receiving a `Resources::Job`, returning truthy to keep. Defaults to `PASS_ALL_FILTER`.
‘only_*` and `except_*` AND together with the predicate.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/zizq/test/client.rb', line 270 def apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) #: (Array[Entry], **untyped) -> Array[Entry] only_queues = normalize_filter(only_queues) except_queues = normalize_filter(except_queues) only_types = normalize_filter(only_types) except_types = normalize_filter(except_types) entries.select do |entry| queue = entry.data["queue"] #: String type = entry.data["type"] #: String (only_queues.empty? || only_queues.include?(queue)) && (except_queues.empty? || !except_queues.include?(queue)) && (only_types.empty? || only_types.include?(type)) && (except_types.empty? || !except_types.include?(type)) && filter.call(entry.job) end end |
#clear! ⇒ Object
Reset the buffer. Called between tests via ‘Zizq::Test.reset!`.
101 102 103 |
# File 'lib/zizq/test/client.rb', line 101 def clear! #: () -> void @mutex.synchronize { @entries.clear } end |
#close ⇒ Object
: () -> void
105 106 |
# File 'lib/zizq/test/client.rb', line 105 def close #: () -> void end |
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array
92 93 94 |
# File 'lib/zizq/test/client.rb', line 92 def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_COMPLETED], **filters) end |
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array
96 97 98 |
# File 'lib/zizq/test/client.rb', line 96 def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_DEAD], **filters) end |
#dispatch_entry(entry) ⇒ Object
Mark the entry in_flight, dispatch through the full dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), settle to completed or dead. A raised exception re-raises after recording — same observable behaviour as Rails’ ‘perform_enqueued_jobs`.
338 339 340 341 342 343 344 345 346 347 |
# File 'lib/zizq/test/client.rb', line 338 def dispatch_entry(entry) #: (Entry) -> void entry.data["status"] = STATUS_IN_FLIGHT begin Zizq.configuration.dequeue_middleware.call(entry.job) entry.data["status"] = STATUS_COMPLETED rescue entry.data["status"] = STATUS_DEAD raise end end |
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. Re-enqueues during dispatch fall through the loop naturally — they get drained too unless they fall outside the filter set.
The per-iteration snapshot is taken under the mutex and marked ‘in_flight` atomically. Dispatch happens outside the mutex so handlers can re-enter the client without deadlocking. On success the entry moves to `completed`; on a raised exception it moves to `dead` and the exception re-raises (matching ActiveJob’s ‘perform_enqueued_jobs` + Sidekiq’s ‘drain`).
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/zizq/test/client.rb', line 122 def drain(**filters) #: (**untyped) -> Integer total = 0 loop do snapshot = take_runnable_snapshot(**filters) break if snapshot.empty? snapshot.each { |entry| dispatch_entry(entry) } total += snapshot.size end total end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/zizq/test/client.rb', line 135 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) req = EnqueueRequest.new( queue:, type:, payload: self.class.normalize_payload(payload), priority:, ready_at:, retry_limit:, backoff:, retention:, unique_key:, unique_while:, ) @mutex.synchronize { record_unsynchronized(req) }.job end |
#enqueue_bulk(jobs:) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/zizq/test/client.rb', line 161 def enqueue_bulk(jobs:) @mutex.synchronize do jobs.map do |params| req = EnqueueRequest.new( queue: params[:queue], type: params[:type], payload: self.class.normalize_payload(params[:payload]), priority: params[:priority], ready_at: params[:ready_at], retry_limit: params[:retry_limit], backoff: params[:backoff], retention: params[:retention], unique_key: params[:unique_key], unique_while: params[:unique_while], ) record_unsynchronized(req).job end end end |
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered. See ‘apply_filters` for the filter kwargs.
69 70 71 |
# File 'lib/zizq/test/client.rb', line 69 def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job] @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) } end |
#enqueued_requests(**filters) ⇒ Object
Original ‘EnqueueRequest`s in submission order. Useful when a test needs metadata that doesn’t survive onto ‘Resources::Job` (`unique_key`, `unique_while`, `delay` before `ready_at` resolution, etc.). Same filter kwargs as `enqueued_jobs`.
77 78 79 |
# File 'lib/zizq/test/client.rb', line 77 def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest] @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) } end |
#filter_by_status(statuses, **filters) ⇒ Object
293 294 295 296 297 298 299 |
# File 'lib/zizq/test/client.rb', line 293 def filter_by_status(statuses, **filters) #: (Array[String], **untyped) -> Array[Resources::Job] @mutex.synchronize do apply_filters(@entries, **filters) .select { |e| statuses.include?(e.data["status"]) } .map(&:job) end end |
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array
88 89 90 |
# File 'lib/zizq/test/client.rb', line 88 def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_IN_FLIGHT], **filters) end |
#normalize_filter(value) ⇒ Object
Accept a Class, String, or Array of those; emit an Array of Strings. Class names match the API’s ‘type` string.
324 325 326 327 328 329 330 |
# File 'lib/zizq/test/client.rb', line 324 def normalize_filter(value) #: ((String | Class | Array[String | Class])?) -> Array[String] case value when nil then [] when Array then value.map { |x| x.to_s } else [value.to_s] end end |
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch — ‘ready` and `scheduled` entries. `pending_jobs` is the set `drain` would attempt to run on its next call (modulo `ready_at` for `scheduled` entries).
84 85 86 |
# File 'lib/zizq/test/client.rb', line 84 def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters) end |
#runnable?(entry, now_ms) ⇒ Boolean
: (Entry, Integer) -> bool
314 315 316 317 318 319 320 |
# File 'lib/zizq/test/client.rb', line 314 def runnable?(entry, now_ms) #: (Entry, Integer) -> bool case entry.data["status"] when STATUS_READY then true when STATUS_SCHEDULED then entry.data["ready_at"] <= now_ms else false end end |
#take_runnable_snapshot(**filters) ⇒ Object
Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot. Holding the mutex through this is fine because we don’t call user code — dispatch happens outside.
305 306 307 308 309 310 311 312 |
# File 'lib/zizq/test/client.rb', line 305 def take_runnable_snapshot(**filters) #: (**untyped) -> Array[Entry] now_ms = (Time.now.to_f * 1000).to_i @mutex.synchronize do apply_filters(@entries, **filters) .select { |e| runnable?(e, now_ms) } .each { |entry| entry.data["status"] = STATUS_IN_FLIGHT } end end |