Class: Zizq::Test::Client
Overview
A ‘Zizq::Client` stand-in for use in test suites.
Buffers ‘enqueue` / `enqueue_bulk` calls in memory and returns synthetic `Resources::Job` instances (with generated ids) so that callers depending on the regular client’s return contract don’t need to special-case test mode.
Read operations (‘get_queues`, `list_jobs`, `count_jobs`, …) are explicitly not supported in test mode and raise `NotSupported`. Tests that need those should either run against a real server or stub at a higher level.
Activated indirectly via ‘Zizq.configuration.test_mode = true` —`Zizq.client` then lazily builds a `Test::Client` instead of a real `Client`.
Defined Under Namespace
Classes: Entry, NotSupported
Constant Summary collapse
- ID_LENGTH =
Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (‘test` prefix + zero padded counter) so they fit anywhere a real id would.
25- ID_PREFIX =
"test"- STATUS_SCHEDULED =
The canonical Zizq lifecycle states. We mirror these so the ‘status` on a buffered job reflects what the real server would report. Test mode never retries — `in_flight` only ever transitions to `completed` or `dead`.
"scheduled"- STATUS_READY =
"ready"- STATUS_IN_FLIGHT =
"in_flight"- STATUS_COMPLETED =
"completed"- STATUS_DEAD =
"dead"- PASS_ALL_FILTER =
Default ‘filter:` lambda — passes every job. Named so the filter pipeline is always callable without nil-checking.
->(_job) { true }
Constants inherited from Client
Client::CONTENT_TYPES, Client::STREAM_ACCEPT
Instance Attribute Summary
Attributes inherited from Client
Instance Method Summary collapse
-
#clear! ⇒ Object
Reset the buffer.
-
#close ⇒ Object
: () -> void.
-
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters.
- #enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
- #enqueue_bulk(jobs:) ⇒ Object
-
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered.
-
#enqueued_requests(**filters) ⇒ Object
Original ‘EnqueueRequest`s in submission order.
-
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array.
-
#initialize ⇒ Client
constructor
: () -> void.
-
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch — ‘ready` and `scheduled` entries.
Methods inherited from Client
#add_cron_group_entry, #cleanup_internal_clients, #count_jobs, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #health, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #server_version, #take_jobs, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job
Constructor Details
#initialize ⇒ Client
: () -> void
56 57 58 59 60 61 62 63 |
# File 'lib/zizq/test/client.rb', line 56 def initialize #: () -> void # Skip the parent's HTTP setup — we don't open connections in # test mode. The parent's @http and friends stay nil; methods # that would touch them are overridden below. @entries = [] #: Array[Entry] @mutex = Mutex.new end |
Instance Method Details
#clear! ⇒ Object
Reset the buffer. Called between tests via ‘Zizq::Test.reset!`.
99 100 101 |
# File 'lib/zizq/test/client.rb', line 99 def clear! #: () -> void @mutex.synchronize { @entries.clear } end |
#close ⇒ Object
: () -> void
103 104 |
# File 'lib/zizq/test/client.rb', line 103 def close #: () -> void end |
#completed_jobs(**filters) ⇒ Object
: (**untyped) -> Array
90 91 92 |
# File 'lib/zizq/test/client.rb', line 90 def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_COMPLETED], **filters) end |
#dead_jobs(**filters) ⇒ Object
: (**untyped) -> Array
94 95 96 |
# File 'lib/zizq/test/client.rb', line 94 def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_DEAD], **filters) end |
#drain(**filters) ⇒ Object
Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. Re-enqueues during dispatch fall through the loop naturally — they get drained too unless they fall outside the filter set.
The per-iteration snapshot is taken under the mutex and marked ‘in_flight` atomically. Dispatch happens outside the mutex so handlers can re-enter the client without deadlocking. On success the entry moves to `completed`; on a raised exception it moves to `dead` and the exception re-raises (matching ActiveJob’s ‘perform_enqueued_jobs` + Sidekiq’s ‘drain`).
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/zizq/test/client.rb', line 120 def drain(**filters) #: (**untyped) -> Integer total = 0 loop do snapshot = take_runnable_snapshot(**filters) break if snapshot.empty? snapshot.each { |entry| dispatch_entry(entry) } total += snapshot.size end total end |
#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/zizq/test/client.rb', line 133 def enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) req = EnqueueRequest.new( queue:, type:, payload:, priority:, ready_at:, retry_limit:, backoff:, retention:, unique_key:, unique_while:, ) @mutex.synchronize { record_unsynchronized(req) }.job end |
#enqueue_bulk(jobs:) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/zizq/test/client.rb', line 159 def enqueue_bulk(jobs:) @mutex.synchronize do jobs.map do |params| req = EnqueueRequest.new( queue: params[:queue], type: params[:type], payload: params[:payload], priority: params[:priority], ready_at: params[:ready_at], retry_limit: params[:retry_limit], backoff: params[:backoff], retention: params[:retention], unique_key: params[:unique_key], unique_while: params[:unique_while], ) record_unsynchronized(req).job end end end |
#enqueued_jobs(**filters) ⇒ Object
All buffered jobs, in submission order, optionally filtered. See ‘apply_filters` for the filter kwargs.
67 68 69 |
# File 'lib/zizq/test/client.rb', line 67 def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job] @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) } end |
#enqueued_requests(**filters) ⇒ Object
Original ‘EnqueueRequest`s in submission order. Useful when a test needs metadata that doesn’t survive onto ‘Resources::Job` (`unique_key`, `unique_while`, `delay` before `ready_at` resolution, etc.). Same filter kwargs as `enqueued_jobs`.
75 76 77 |
# File 'lib/zizq/test/client.rb', line 75 def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest] @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) } end |
#in_flight_jobs(**filters) ⇒ Object
: (**untyped) -> Array
86 87 88 |
# File 'lib/zizq/test/client.rb', line 86 def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_IN_FLIGHT], **filters) end |
#pending_jobs(**filters) ⇒ Object
Jobs awaiting dispatch — ‘ready` and `scheduled` entries. `pending_jobs` is the set `drain` would attempt to run on its next call (modulo `ready_at` for `scheduled` entries).
82 83 84 |
# File 'lib/zizq/test/client.rb', line 82 def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job] filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters) end |