Class: Zizq::Test::Client

Inherits:
Client
  • Object
show all
Defined in:
lib/zizq/test/client.rb,
sig/generated/zizq/test/client.rbs

Overview

A Zizq::Client stand-in for use in test suites.

Buffers enqueue / enqueue_bulk calls in memory and returns synthetic Resources::Job instances (with generated ids) so that callers depending on the regular client's return contract don't need to special-case test mode.

Read operations (get_queues, list_jobs, count_jobs, …) are explicitly not supported in test mode and raise NotSupported. Tests that need those should either run against a real server or stub at a higher level.

Activated indirectly via Zizq.configuration.test_mode = trueZizq.client then lazily builds a Test::Client instead of a real Client.

Defined Under Namespace

Classes: Entry, NotSupported

Constant Summary collapse

ID_LENGTH =

Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (test prefix + zero padded counter) so they fit anywhere a real id would.

Returns:

  • (::Integer)
25
ID_PREFIX =

Returns:

  • (::String)
"test"
STATUS_SCHEDULED =

The canonical Zizq lifecycle states. We mirror these so the status on a buffered job reflects what the real server would report. Test mode never retries — in_flight only ever transitions to completed or dead.

Returns:

  • (::String)
"scheduled"
STATUS_READY =

Returns:

  • (::String)
"ready"
STATUS_IN_FLIGHT =

Returns:

  • (::String)
"in_flight"
STATUS_COMPLETED =

Returns:

  • (::String)
"completed"
STATUS_DEAD =

Returns:

  • (::String)
"dead"
PASS_ALL_FILTER =

Default filter: lambda — passes every job. Named so the filter pipeline is always callable without nil-checking.

Returns:

->(_job) { true }

Constants inherited from Client

Client::CONTENT_TYPES, Client::STREAM_ACCEPT

Instance Attribute Summary

Attributes inherited from Client

#format, #url

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#add_cron_group_entry, #build_cron_entry, #build_cron_job, #build_path, #build_set_body, #build_where_params, #cleanup_internal_clients, #consume_response, #count_jobs, #decode_body, #delete, #delete_all_crons, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #enc, #encode_body, #encode_range, #ensure_io_thread, #erase_all_data, #get, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #handle_response!, #health, #http, #io_thread_run, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #patch, #post, #put, #raw_post, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #request, #server_version, #stream_http, #sync_call, #take_jobs, #thread_local_http, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job, #validate_and_build_set, #validate_where

Constructor Details

#initializeClient

: () -> void



58
59
60
61
62
63
64
65
# File 'lib/zizq/test/client.rb', line 58

def initialize #: () -> void
  # Skip the parent's HTTP setup — we don't open connections in
  # test mode. The parent's @http and friends stay nil; methods
  # that would touch them are overridden below.

  @entries = [] #: Array[Entry]
  @mutex = Mutex.new
end

Class Method Details

.normalize_payload(payload) ⇒ Object

Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.

Also used by Zizq::Test.enqueued_raw? / enqueued_raw_count to normalize the query side so symbol-keyed assertion payloads still match the (now string-keyed) buffer.

Parameters:

  • payload (Object)

Returns:

  • (Object)


253
254
255
# File 'lib/zizq/test/client.rb', line 253

def self.normalize_payload(payload) #: (untyped) -> untyped
  JSON.parse(JSON.generate(payload))
end

Instance Method Details

#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object

Returns entries matching every named filter AND the predicate. All filter kwargs are optional; unset means "don't filter on this axis." Callers must hold @mutex — public accessors do so via synchronize before calling.

  • only_queues: / except_queues: — String, Array of Strings.
  • only_types: / except_types: — String, Class, or Array of those. Class names are matched against the wire-format type string via .to_s.
  • filter: — a lambda receiving a Resources::Job, returning truthy to keep. Defaults to PASS_ALL_FILTER.

only_* and except_* AND together with the predicate.

Parameters:

  • entries (Object)
  • only_queues: (Object) (defaults to: nil)
  • except_queues: (Object) (defaults to: nil)
  • only_types: (Object) (defaults to: nil)
  • except_types: (Object) (defaults to: nil)
  • filter: (Object) (defaults to: PASS_ALL_FILTER)

Returns:

  • (Object)


270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/zizq/test/client.rb', line 270

def apply_filters(entries,
                  only_queues: nil,
                  except_queues: nil,
                  only_types: nil,
                  except_types: nil,
                  filter: PASS_ALL_FILTER) #: (Array[Entry], **untyped) -> Array[Entry]
  only_queues   = normalize_filter(only_queues)
  except_queues = normalize_filter(except_queues)
  only_types    = normalize_filter(only_types)
  except_types  = normalize_filter(except_types)

  entries.select do |entry|
    queue = entry.data["queue"] #: String
    type  = entry.data["type"]  #: String

    (only_queues.empty? || only_queues.include?(queue)) &&
      (except_queues.empty? || !except_queues.include?(queue)) &&
      (only_types.empty? || only_types.include?(type)) &&
      (except_types.empty? || !except_types.include?(type)) &&
      filter.call(entry.job)
  end
end

#clear!Object

Reset the buffer. Called between tests via Zizq::Test.reset!.

Returns:

  • (Object)


101
102
103
# File 'lib/zizq/test/client.rb', line 101

def clear! #: () -> void
  @mutex.synchronize { @entries.clear }
end

#closeObject

: () -> void

Returns:

  • (Object)


105
106
# File 'lib/zizq/test/client.rb', line 105

def close #: () -> void
end

#completed_jobs(**filters) ⇒ Object

: (**untyped) -> Array

Parameters:

  • filters (Object)

Returns:

  • (Object)


92
93
94
# File 'lib/zizq/test/client.rb', line 92

def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_COMPLETED], **filters)
end

#dead_jobs(**filters) ⇒ Object

: (**untyped) -> Array

Parameters:

  • filters (Object)

Returns:

  • (Object)


96
97
98
# File 'lib/zizq/test/client.rb', line 96

def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_DEAD], **filters)
end

#dispatch_entry(entry) ⇒ Object

Mark the entry in_flight, dispatch through the full dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), settle to completed or dead. A raised exception re-raises after recording — same observable behaviour as Rails' perform_enqueued_jobs.

Parameters:

  • entry (Object)

Returns:

  • (Object)


338
339
340
341
342
343
344
345
346
347
# File 'lib/zizq/test/client.rb', line 338

def dispatch_entry(entry) #: (Entry) -> void
  entry.data["status"] = STATUS_IN_FLIGHT
  begin
    Zizq.configuration.dequeue_middleware.call(entry.job)
    entry.data["status"] = STATUS_COMPLETED
  rescue
    entry.data["status"] = STATUS_DEAD
    raise
  end
end

#drain(**filters) ⇒ Object

Dispatch every runnable entry (status ready, or scheduled with ready_at already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. Re-enqueues during dispatch fall through the loop naturally — they get drained too unless they fall outside the filter set.

The per-iteration snapshot is taken under the mutex and marked in_flight atomically. Dispatch happens outside the mutex so handlers can re-enter the client without deadlocking. On success the entry moves to completed; on a raised exception it moves to dead and the exception re-raises (matching ActiveJob's perform_enqueued_jobs + Sidekiq's drain).

Parameters:

  • filters (Object)

Returns:

  • (Object)


122
123
124
125
126
127
128
129
130
131
132
# File 'lib/zizq/test/client.rb', line 122

def drain(**filters) #: (**untyped) -> Integer
  total = 0
  loop do
    snapshot = take_runnable_snapshot(**filters)
    break if snapshot.empty?

    snapshot.each { |entry| dispatch_entry(entry) }
    total += snapshot.size
  end
  total
end

#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ void

This method returns an undefined value.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/zizq/test/client.rb', line 135

def enqueue(queue:,
            type:,
            payload:,
            priority: nil,
            ready_at: nil,
            retry_limit: nil,
            backoff: nil,
            retention: nil,
            unique_key: nil,
            unique_while: nil)
  req = EnqueueRequest.new(
    queue:,
    type:,
    payload: self.class.normalize_payload(payload),
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:,
    unique_key:,
    unique_while:,
  )
  @mutex.synchronize { record_unsynchronized(req) }.job
end

#enqueue_bulk(jobs:) ⇒ void

This method returns an undefined value.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/zizq/test/client.rb', line 161

def enqueue_bulk(jobs:)
  @mutex.synchronize do
    jobs.map do |params|
      req = EnqueueRequest.new(
        queue:        params[:queue],
        type:         params[:type],
        payload:      self.class.normalize_payload(params[:payload]),
        priority:     params[:priority],
        ready_at:     params[:ready_at],
        retry_limit:  params[:retry_limit],
        backoff:      params[:backoff],
        retention:    params[:retention],
        unique_key:   params[:unique_key],
        unique_while: params[:unique_while],
      )
      record_unsynchronized(req).job
    end
  end
end

#enqueued_jobs(**filters) ⇒ Object

All buffered jobs, in submission order, optionally filtered. See apply_filters for the filter kwargs.

Parameters:

  • filters (Object)

Returns:

  • (Object)


69
70
71
# File 'lib/zizq/test/client.rb', line 69

def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) }
end

#enqueued_requests(**filters) ⇒ Object

Original EnqueueRequests in submission order. Useful when a test needs metadata that doesn't survive onto Resources::Job (unique_key, unique_while, delay before ready_at resolution, etc.). Same filter kwargs as enqueued_jobs.

Parameters:

  • filters (Object)

Returns:

  • (Object)


77
78
79
# File 'lib/zizq/test/client.rb', line 77

def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) }
end

#filter_by_status(statuses, **filters) ⇒ Object

: (Array, **untyped) -> Array

Parameters:

  • statuses (Object)
  • filters (Object)

Returns:

  • (Object)


293
294
295
296
297
298
299
# File 'lib/zizq/test/client.rb', line 293

def filter_by_status(statuses, **filters) #: (Array[String], **untyped) -> Array[Resources::Job]
  @mutex.synchronize do
    apply_filters(@entries, **filters)
      .select { |e| statuses.include?(e.data["status"]) }
      .map(&:job)
  end
end

#in_flight_jobs(**filters) ⇒ Object

: (**untyped) -> Array

Parameters:

  • filters (Object)

Returns:

  • (Object)


88
89
90
# File 'lib/zizq/test/client.rb', line 88

def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_IN_FLIGHT], **filters)
end

#normalize_filter(value) ⇒ Object

Accept a Class, String, or Array of those; emit an Array of Strings. Class names match the API's type string.

Parameters:

  • value (Object)

Returns:

  • (Object)


324
325
326
327
328
329
330
# File 'lib/zizq/test/client.rb', line 324

def normalize_filter(value) #: ((String | Class | Array[String | Class])?) -> Array[String]
  case value
  when nil then []
  when Array then value.map { |x| x.to_s }
  else [value.to_s]
  end
end

#pending_jobs(**filters) ⇒ Object

Jobs awaiting dispatch — ready and scheduled entries. pending_jobs is the set drain would attempt to run on its next call (modulo ready_at for scheduled entries).

Parameters:

  • filters (Object)

Returns:

  • (Object)


84
85
86
# File 'lib/zizq/test/client.rb', line 84

def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters)
end

#record_unsynchronized(req) ⇒ Object

: (EnqueueRequest) -> Entry

Parameters:

  • req (Object)

Returns:

  • (Object)


212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/zizq/test/client.rb', line 212

def record_unsynchronized(req) #: (EnqueueRequest) -> Entry
  # Serialized format: ready_at is integer milliseconds.
  # `req.ready_at` (from `to_enqueue_params`) is fractional
  # seconds; convert here so `Resources::Job#ready_at`'s
  # ms -> seconds round-trip produces the same value the
  # caller passed in. When the client omits ready_at the server
  # assigns `now`, so we do the same.
  now_ms = (Time.now.to_f * 1000).to_i
  ready_at_ms = req.ready_at ? (req.ready_at.to_f * 1000).to_i : now_ms

  data = {
    "id" => synthetic_id(@entries.size + 1),
    "queue" => req.queue,
    "type" => req.type,
    "payload" => req.payload,
    "priority" => req.priority,
    "ready_at" => ready_at_ms,
    "retry_limit" => req.retry_limit,
    "status" => ready_at_ms > now_ms ? STATUS_SCHEDULED : STATUS_READY,
  }
  entry = Entry.new(request: req, job: Resources::Job.new(self, data), data: data)
  @entries << entry
  entry
end

#runnable?(entry, now_ms) ⇒ Boolean

: (Entry, Integer) -> bool

Parameters:

  • entry (Object)
  • now_ms (Object)

Returns:

  • (Boolean)


314
315
316
317
318
319
320
# File 'lib/zizq/test/client.rb', line 314

def runnable?(entry, now_ms) #: (Entry, Integer) -> bool
  case entry.data["status"]
  when STATUS_READY     then true
  when STATUS_SCHEDULED then entry.data["ready_at"] <= now_ms
  else false
  end
end

#synthetic_id(counter) ⇒ Object

: (Integer) -> String

Parameters:

  • counter (Object)

Returns:

  • (Object)


237
238
239
# File 'lib/zizq/test/client.rb', line 237

def synthetic_id(counter) #: (Integer) -> String
  "#{ID_PREFIX}#{counter.to_s.rjust(ID_LENGTH - ID_PREFIX.length, '0')}"
end

#take_runnable_snapshot(**filters) ⇒ Object

Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot. Holding the mutex through this is fine because we don't call user code — dispatch happens outside.

Parameters:

  • filters (Object)

Returns:

  • (Object)


305
306
307
308
309
310
311
312
# File 'lib/zizq/test/client.rb', line 305

def take_runnable_snapshot(**filters) #: (**untyped) -> Array[Entry]
  now_ms = (Time.now.to_f * 1000).to_i
  @mutex.synchronize do
    apply_filters(@entries, **filters)
      .select { |e| runnable?(e, now_ms) }
      .each { |entry| entry.data["status"] = STATUS_IN_FLIGHT }
  end
end