Class: Zizq::Test::Client

Inherits:
Client
  • Object
show all
Defined in:
lib/zizq/test/client.rb

Overview

A ‘Zizq::Client` stand-in for use in test suites.

Buffers ‘enqueue` / `enqueue_bulk` calls in memory and returns synthetic `Resources::Job` instances (with generated ids) so that callers depending on the regular client’s return contract don’t need to special-case test mode.

Read operations (‘get_queues`, `list_jobs`, `count_jobs`, …) are explicitly not supported in test mode and raise `NotSupported`. Tests that need those should either run against a real server or stub at a higher level.

Activated indirectly via ‘Zizq.configuration.test_mode = true` —`Zizq.client` then lazily builds a `Test::Client` instead of a real `Client`.

Defined Under Namespace

Classes: Entry, NotSupported

Constant Summary collapse

ID_LENGTH =

Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (‘test` prefix + zero padded counter) so they fit anywhere a real id would.

25
ID_PREFIX =
"test"
STATUS_SCHEDULED =

The canonical Zizq lifecycle states. We mirror these so the ‘status` on a buffered job reflects what the real server would report. Test mode never retries — `in_flight` only ever transitions to `completed` or `dead`.

"scheduled"
STATUS_READY =
"ready"
STATUS_IN_FLIGHT =
"in_flight"
STATUS_COMPLETED =
"completed"
STATUS_DEAD =
"dead"
PASS_ALL_FILTER =

Default ‘filter:` lambda — passes every job. Named so the filter pipeline is always callable without nil-checking.

->(_job) { true }

Constants inherited from Client

Client::CONTENT_TYPES, Client::STREAM_ACCEPT

Instance Attribute Summary

Attributes inherited from Client

#format, #url

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#add_cron_group_entry, #cleanup_internal_clients, #count_jobs, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #health, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #server_version, #take_jobs, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job

Constructor Details

#initializeClient

: () -> void



58
59
60
61
62
63
64
65
# File 'lib/zizq/test/client.rb', line 58

def initialize #: () -> void
  # Skip the parent's HTTP setup — we don't open connections in
  # test mode. The parent's @http and friends stay nil; methods
  # that would touch them are overridden below.

  @entries = [] #: Array[Entry]
  @mutex = Mutex.new
end

Class Method Details

.normalize_payload(payload) ⇒ Object

Round-trip the payload through JSON so the in-memory representation matches what a consumer would receive over the wire: symbol keys / Symbol values become strings, nested hashes and arrays are normalized recursively, and non-JSON- safe values (BigDecimal, custom objects) raise here rather than surviving in test mode only to break in production.

Also used by ‘Zizq::Test.enqueued_raw?` / `enqueued_raw_count` to normalize the query side so symbol-keyed assertion payloads still match the (now string-keyed) buffer.



253
254
255
# File 'lib/zizq/test/client.rb', line 253

def self.normalize_payload(payload) #: (untyped) -> untyped
  JSON.parse(JSON.generate(payload))
end

Instance Method Details

#apply_filters(entries, only_queues: nil, except_queues: nil, only_types: nil, except_types: nil, filter: PASS_ALL_FILTER) ⇒ Object

Returns entries matching every named filter AND the predicate. All filter kwargs are optional; unset means “don’t filter on this axis.” Callers must hold ‘@mutex` — public accessors do so via `synchronize` before calling.

  • ‘only_queues:` / `except_queues:` — String, Array of Strings.

  • ‘only_types:` / `except_types:` — String, Class, or Array of those. Class names are matched against the wire-format `type` string via `.to_s`.

  • ‘filter:` — a lambda receiving a `Resources::Job`, returning truthy to keep. Defaults to `PASS_ALL_FILTER`.

‘only_*` and `except_*` AND together with the predicate.



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/zizq/test/client.rb', line 270

def apply_filters(entries,
                  only_queues: nil,
                  except_queues: nil,
                  only_types: nil,
                  except_types: nil,
                  filter: PASS_ALL_FILTER) #: (Array[Entry], **untyped) -> Array[Entry]
  only_queues   = normalize_filter(only_queues)
  except_queues = normalize_filter(except_queues)
  only_types    = normalize_filter(only_types)
  except_types  = normalize_filter(except_types)

  entries.select do |entry|
    queue = entry.data["queue"] #: String
    type  = entry.data["type"]  #: String

    (only_queues.empty? || only_queues.include?(queue)) &&
      (except_queues.empty? || !except_queues.include?(queue)) &&
      (only_types.empty? || only_types.include?(type)) &&
      (except_types.empty? || !except_types.include?(type)) &&
      filter.call(entry.job)
  end
end

#clear!Object

Reset the buffer. Called between tests via ‘Zizq::Test.reset!`.



101
102
103
# File 'lib/zizq/test/client.rb', line 101

def clear! #: () -> void
  @mutex.synchronize { @entries.clear }
end

#closeObject

: () -> void



105
106
# File 'lib/zizq/test/client.rb', line 105

def close #: () -> void
end

#completed_jobs(**filters) ⇒ Object

: (**untyped) -> Array



92
93
94
# File 'lib/zizq/test/client.rb', line 92

def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_COMPLETED], **filters)
end

#dead_jobs(**filters) ⇒ Object

: (**untyped) -> Array



96
97
98
# File 'lib/zizq/test/client.rb', line 96

def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_DEAD], **filters)
end

#dispatch_entry(entry) ⇒ Object

Mark the entry in_flight, dispatch through the full dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), settle to completed or dead. A raised exception re-raises after recording — same observable behaviour as Rails’ ‘perform_enqueued_jobs`.



338
339
340
341
342
343
344
345
346
347
# File 'lib/zizq/test/client.rb', line 338

def dispatch_entry(entry) #: (Entry) -> void
  entry.data["status"] = STATUS_IN_FLIGHT
  begin
    Zizq.configuration.dequeue_middleware.call(entry.job)
    entry.data["status"] = STATUS_COMPLETED
  rescue
    entry.data["status"] = STATUS_DEAD
    raise
  end
end

#drain(**filters) ⇒ Object

Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. Re-enqueues during dispatch fall through the loop naturally — they get drained too unless they fall outside the filter set.

The per-iteration snapshot is taken under the mutex and marked ‘in_flight` atomically. Dispatch happens outside the mutex so handlers can re-enter the client without deadlocking. On success the entry moves to `completed`; on a raised exception it moves to `dead` and the exception re-raises (matching ActiveJob’s ‘perform_enqueued_jobs` + Sidekiq’s ‘drain`).



122
123
124
125
126
127
128
129
130
131
132
# File 'lib/zizq/test/client.rb', line 122

def drain(**filters) #: (**untyped) -> Integer
  total = 0
  loop do
    snapshot = take_runnable_snapshot(**filters)
    break if snapshot.empty?

    snapshot.each { |entry| dispatch_entry(entry) }
    total += snapshot.size
  end
  total
end

#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/zizq/test/client.rb', line 135

def enqueue(queue:,
            type:,
            payload:,
            priority: nil,
            ready_at: nil,
            retry_limit: nil,
            backoff: nil,
            retention: nil,
            unique_key: nil,
            unique_while: nil)
  req = EnqueueRequest.new(
    queue:,
    type:,
    payload: self.class.normalize_payload(payload),
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:,
    unique_key:,
    unique_while:,
  )
  @mutex.synchronize { record_unsynchronized(req) }.job
end

#enqueue_bulk(jobs:) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/zizq/test/client.rb', line 161

def enqueue_bulk(jobs:)
  @mutex.synchronize do
    jobs.map do |params|
      req = EnqueueRequest.new(
        queue:        params[:queue],
        type:         params[:type],
        payload:      self.class.normalize_payload(params[:payload]),
        priority:     params[:priority],
        ready_at:     params[:ready_at],
        retry_limit:  params[:retry_limit],
        backoff:      params[:backoff],
        retention:    params[:retention],
        unique_key:   params[:unique_key],
        unique_while: params[:unique_while],
      )
      record_unsynchronized(req).job
    end
  end
end

#enqueued_jobs(**filters) ⇒ Object

All buffered jobs, in submission order, optionally filtered. See ‘apply_filters` for the filter kwargs.



69
70
71
# File 'lib/zizq/test/client.rb', line 69

def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) }
end

#enqueued_requests(**filters) ⇒ Object

Original ‘EnqueueRequest`s in submission order. Useful when a test needs metadata that doesn’t survive onto ‘Resources::Job` (`unique_key`, `unique_while`, `delay` before `ready_at` resolution, etc.). Same filter kwargs as `enqueued_jobs`.



77
78
79
# File 'lib/zizq/test/client.rb', line 77

def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) }
end

#filter_by_status(statuses, **filters) ⇒ Object

: (Array, **untyped) -> Array



293
294
295
296
297
298
299
# File 'lib/zizq/test/client.rb', line 293

def filter_by_status(statuses, **filters) #: (Array[String], **untyped) -> Array[Resources::Job]
  @mutex.synchronize do
    apply_filters(@entries, **filters)
      .select { |e| statuses.include?(e.data["status"]) }
      .map(&:job)
  end
end

#in_flight_jobs(**filters) ⇒ Object

: (**untyped) -> Array



88
89
90
# File 'lib/zizq/test/client.rb', line 88

def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_IN_FLIGHT], **filters)
end

#normalize_filter(value) ⇒ Object

Accept a Class, String, or Array of those; emit an Array of Strings. Class names match the API’s ‘type` string.



324
325
326
327
328
329
330
# File 'lib/zizq/test/client.rb', line 324

def normalize_filter(value) #: ((String | Class | Array[String | Class])?) -> Array[String]
  case value
  when nil then []
  when Array then value.map { |x| x.to_s }
  else [value.to_s]
  end
end

#pending_jobs(**filters) ⇒ Object

Jobs awaiting dispatch — ‘ready` and `scheduled` entries. `pending_jobs` is the set `drain` would attempt to run on its next call (modulo `ready_at` for `scheduled` entries).



84
85
86
# File 'lib/zizq/test/client.rb', line 84

def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters)
end

#runnable?(entry, now_ms) ⇒ Boolean

: (Entry, Integer) -> bool

Returns:

  • (Boolean)


314
315
316
317
318
319
320
# File 'lib/zizq/test/client.rb', line 314

def runnable?(entry, now_ms) #: (Entry, Integer) -> bool
  case entry.data["status"]
  when STATUS_READY     then true
  when STATUS_SCHEDULED then entry.data["ready_at"] <= now_ms
  else false
  end
end

#take_runnable_snapshot(**filters) ⇒ Object

Lock, find runnable entries matching the filters, flip each to in_flight, return the snapshot. Holding the mutex through this is fine because we don’t call user code — dispatch happens outside.



305
306
307
308
309
310
311
312
# File 'lib/zizq/test/client.rb', line 305

def take_runnable_snapshot(**filters) #: (**untyped) -> Array[Entry]
  now_ms = (Time.now.to_f * 1000).to_i
  @mutex.synchronize do
    apply_filters(@entries, **filters)
      .select { |e| runnable?(e, now_ms) }
      .each { |entry| entry.data["status"] = STATUS_IN_FLIGHT }
  end
end