Class: Zizq::Test::Client
- Defined in:
- lib/zizq/test/client.rb,
sig/generated/zizq/test/client.rbs
Overview
A Zizq::Client stand-in for use in test suites.
Buffers enqueue / enqueue_bulk calls in memory and returns
synthetic Resources::Job instances (with generated ids) so that
callers depending on the regular client's return contract don't
need to special-case test mode.
Read operations (get_queues, list_jobs, count_jobs, …) are
explicitly not supported in test mode and raise NotSupported.
Tests that need those should either run against a real server or
stub at a higher level.
Activated indirectly via Zizq.configuration.test_mode = true —
Zizq.client then lazily builds a Test::Client instead of a
real Client.
Defined Under Namespace
Classes: Entry, NotSupported
Constant Summary collapse
- ID_LENGTH =
Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (
testprefix + zero padded counter) so they fit anywhere a real id would. 25- ID_PREFIX =
"test"- STATUS_SCHEDULED =
The canonical Zizq lifecycle states. We mirror these so the
statuson a buffered job reflects what the real server would report. Test mode never retries —in_flightonly ever transitions tocompletedordead. "scheduled"- STATUS_READY =
"ready"- STATUS_IN_FLIGHT =
"in_flight"- STATUS_COMPLETED =
"completed"- STATUS_DEAD =
"dead"- PASS_ALL_FILTER =
Default
filter:lambda — passes every job. Named so the filter pipeline is always callable without nil-checking. ->(_job) { true }
Constants inherited from Client
Client::CONTENT_TYPES, Client::STREAM_ACCEPT
Instance Attribute Summary
Attributes inherited from Client
Class Method Summary collapse
-
.normalize_payload(payload) ⇒ Object
Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.
Instance Method Summary collapse
-
#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object
Returns entries matching every named filter AND the predicate.
-
#clear! ⇒ Object
Reset the buffer.
-
#close ⇒ Object
: () -> void.
-
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#dispatch_entry(entry) ⇒ Object
Mark the entry in_flight, dispatch through the full dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), settle to completed or dead.
-
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status
ready, orscheduledwithready_atalready elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. - #enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ void
- #enqueue_bulk(jobs:) ⇒ void
-
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered.
-
#enqueued_requests(**filters) ⇒ Object
Original
EnqueueRequests in submission order. - #filter_by_status(statuses, **filters) ⇒ Object
-
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#initialize ⇒ Client
constructor
: () -> void.
-
#normalize_filter(value) ⇒ Object
Accept a Class, String, or Array of those; emit an Array of Strings.
-
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch —
readyandscheduledentries. -
#record_unsynchronized(req) ⇒ Object
: (EnqueueRequest) -> Entry.
-
#runnable?(entry, now_ms) ⇒ Boolean
: (Entry, Integer) -> bool.
-
#synthetic_id(counter) ⇒ Object
: (Integer) -> String.
-
#take_runnable_snapshot(**filters) ⇒ Object
Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot.
Methods inherited from Client
#add_cron_group_entry, #build_cron_entry, #build_cron_job, #build_path, #build_set_body, #build_where_params, #cleanup_internal_clients, #consume_response, #count_jobs, #decode_body, #delete, #delete_all_crons, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #enc, #encode_body, #encode_range, #ensure_io_thread, #erase_all_data, #get, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #handle_response!, #health, #http, #io_thread_run, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #patch, #post, #put, #raw_post, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #request, #server_version, #stream_http, #sync_call, #take_jobs, #thread_local_http, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job, #validate_and_build_set, #validate_where
Constructor Details
#initialize ⇒ Client
: () -> void
58 59 60 61 62 63 64 65 |
# File 'lib/zizq/test/client.rb', line 58 def initialize #: () -> void # Skip the parent's HTTP setup — we don't open connections in # test mode. The parent's @http and friends stay nil; methods # that would touch them are overridden below. @entries = [] #: Array[Entry] @mutex = Mutex.new end |
Class Method Details
.normalize_payload(payload) ⇒ Object
Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.
Also used by Zizq::Test.enqueued_raw? / enqueued_raw_count
to normalize the query side so symbol-keyed assertion payloads
still match the (now string-keyed) buffer.
253 254 255 |
# File 'lib/zizq/test/client.rb', line 253 def self.normalize_payload(payload) #: (untyped) -> untyped JSON.parse(JSON.generate(payload)) end |
Instance Method Details
#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object
Returns entries matching every named filter AND the predicate.
All filter kwargs are optional; unset means "don't filter on
this axis." Callers must hold @mutex — public accessors do
so via synchronize before calling.
only_queues:/except_queues:— String, Array of Strings.only_types:/except_types:— String, Class, or Array of those. Class names are matched against the wire-formattypestring via.to_s.filter:— a lambda receiving aResources::Job, returning truthy to keep. Defaults toPASS_ALL_FILTER.
only_* and except_* AND together with the predicate.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/zizq/test/client.rb', line 270 def apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) #: (Array[Entry], **untyped) -> Array[Entry] only_queues = normalize_filter(only_queues) except_queues = normalize_filter(except_queues) only_types = normalize_filter(only_types) except_types = normalize_filter(except_types) entries.select do |entry| queue = entry.data["queue"] #: String type = entry.data["type"] #: String (only_queues.empty? || only_queues.include?(queue)) && (except_queues.empty? || !except_queues.include?(queue)) && (only_types.empty? || only_types.include?(type)) && (except_types.empty? || !except_types.include?(type)) && filter.call(entry.job) end end |
#clear! ⇒ Object
Reset the buffer. Called between tests via Zizq::Test.reset!.
101 102 103 |
# File 'lib/zizq/test/client.rb', line 101 def clear! #: () -> void @mutex.synchronize { @entries.clear } end |
#close ⇒ Object
: () -> void
105 106 |
# File 'lib/zizq/test/client.rb', line 105 def close #: () -> void end |
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array
92 93 94 |
# File 'lib/zizq/test/client.rb', line 92 def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_COMPLETED], **filters) end |
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array
96 97 98 |
# File 'lib/zizq/test/client.rb', line 96 def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_DEAD], **filters) end |
#dispatch_entry(entry) ⇒ Object
Mark the entry in_flight, dispatch through the full dequeue
middleware chain (same path the real worker uses, so any
registered middlewares run in tests too), settle to
completed or dead. A raised exception re-raises after
recording — same observable behaviour as Rails'
perform_enqueued_jobs.
338 339 340 341 342 343 344 345 346 347 |
# File 'lib/zizq/test/client.rb', line 338 def dispatch_entry(entry) #: (Entry) -> void entry.data["status"] = STATUS_IN_FLIGHT begin Zizq.configuration.dequeue_middleware.call(entry.job) entry.data["status"] = STATUS_COMPLETED rescue entry.data["status"] = STATUS_DEAD raise end end |
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status ready, or scheduled
with ready_at already elapsed) through the configured
dequeue middleware chain (same path the real worker uses, so
any registered middlewares run in tests too), looping until no
more match the filters. Re-enqueues during dispatch fall
through the loop naturally — they get drained too unless they
fall outside the filter set.
The per-iteration snapshot is taken under the mutex and marked
in_flight atomically. Dispatch happens outside the mutex so
handlers can re-enter the client without deadlocking. On
success the entry moves to completed; on a raised exception
it moves to dead and the exception re-raises (matching
ActiveJob's perform_enqueued_jobs + Sidekiq's drain).
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/zizq/test/client.rb', line 122 def drain(**filters) #: (**untyped) -> Integer total = 0 loop do snapshot = take_runnable_snapshot(**filters) break if snapshot.empty? snapshot.each { |entry| dispatch_entry(entry) } total += snapshot.size end total end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ void
This method returns an undefined value.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/zizq/test/client.rb', line 135 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) req = EnqueueRequest.new( queue:, type:, payload: self.class.normalize_payload(payload), priority:, ready_at:, retry_limit:, backoff:, retention:, unique_key:, unique_while:, ) @mutex.synchronize { record_unsynchronized(req) }.job end |
#enqueue_bulk(jobs:) ⇒ void
This method returns an undefined value.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/zizq/test/client.rb', line 161 def enqueue_bulk(jobs:) @mutex.synchronize do jobs.map do |params| req = EnqueueRequest.new( queue: params[:queue], type: params[:type], payload: self.class.normalize_payload(params[:payload]), priority: params[:priority], ready_at: params[:ready_at], retry_limit: params[:retry_limit], backoff: params[:backoff], retention: params[:retention], unique_key: params[:unique_key], unique_while: params[:unique_while], ) record_unsynchronized(req).job end end end |
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered.
See apply_filters for the filter kwargs.
69 70 71 |
# File 'lib/zizq/test/client.rb', line 69 def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job] @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) } end |
#enqueued_requests(**filters) ⇒ Object
Original EnqueueRequests in submission order. Useful when a
test needs metadata that doesn't survive onto Resources::Job
(unique_key, unique_while, delay before ready_at
resolution, etc.). Same filter kwargs as enqueued_jobs.
77 78 79 |
# File 'lib/zizq/test/client.rb', line 77 def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest] @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) } end |
#filter_by_status(statuses, **filters) ⇒ Object
293 294 295 296 297 298 299 |
# File 'lib/zizq/test/client.rb', line 293 def filter_by_status(statuses, **filters) #: (Array[String], **untyped) -> Array[Resources::Job] @mutex.synchronize do apply_filters(@entries, **filters) .select { |e| statuses.include?(e.data["status"]) } .map(&:job) end end |
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array
88 89 90 |
# File 'lib/zizq/test/client.rb', line 88 def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_IN_FLIGHT], **filters) end |
#normalize_filter(value) ⇒ Object
Accept a Class, String, or Array of those; emit an Array of
Strings. Class names match the API's type string.
324 325 326 327 328 329 330 |
# File 'lib/zizq/test/client.rb', line 324 def normalize_filter(value) #: ((String | Class | Array[String | Class])?) -> Array[String] case value when nil then [] when Array then value.map { |x| x.to_s } else [value.to_s] end end |
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch — ready and scheduled entries.
pending_jobs is the set drain would attempt to run on its
next call (modulo ready_at for scheduled entries).
84 85 86 |
# File 'lib/zizq/test/client.rb', line 84 def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters) end |
#record_unsynchronized(req) ⇒ Object
: (EnqueueRequest) -> Entry
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/zizq/test/client.rb', line 212 def record_unsynchronized(req) #: (EnqueueRequest) -> Entry # Serialized format: ready_at is integer milliseconds. # `req.ready_at` (from `to_enqueue_params`) is fractional # seconds; convert here so `Resources::Job#ready_at`'s # ms -> seconds round-trip produces the same value the # caller passed in. When the client omits ready_at the server # assigns `now`, so we do the same. now_ms = (Time.now.to_f * 1000).to_i ready_at_ms = req.ready_at ? (req.ready_at.to_f * 1000).to_i : now_ms data = { "id" => synthetic_id(@entries.size + 1), "queue" => req.queue, "type" => req.type, "payload" => req.payload, "priority" => req.priority, "ready_at" => ready_at_ms, "retry_limit" => req.retry_limit, "status" => ready_at_ms > now_ms ? STATUS_SCHEDULED : STATUS_READY, } entry = Entry.new(request: req, job: Resources::Job.new(self, data), data: data) @entries << entry entry end |
#runnable?(entry, now_ms) ⇒ Boolean
: (Entry, Integer) -> bool
314 315 316 317 318 319 320 |
# File 'lib/zizq/test/client.rb', line 314 def runnable?(entry, now_ms) #: (Entry, Integer) -> bool case entry.data["status"] when STATUS_READY then true when STATUS_SCHEDULED then entry.data["ready_at"] <= now_ms else false end end |
#synthetic_id(counter) ⇒ Object
: (Integer) -> String
237 238 239 |
# File 'lib/zizq/test/client.rb', line 237 def synthetic_id(counter) #: (Integer) -> String "#{ID_PREFIX}#{counter.to_s.rjust(ID_LENGTH - ID_PREFIX.length, '0')}" end |
#take_runnable_snapshot(**filters) ⇒ Object
Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot. Holding the mutex through this is fine because we don't call user code — dispatch happens outside.
305 306 307 308 309 310 311 312 |
# File 'lib/zizq/test/client.rb', line 305 def take_runnable_snapshot(**filters) #: (**untyped) -> Array[Entry] now_ms = (Time.now.to_f * 1000).to_i @mutex.synchronize do apply_filters(@entries, **filters) .select { |e| runnable?(e, now_ms) } .each { |entry| entry.data["status"] = STATUS_IN_FLIGHT } end end |