Class: Zizq::Test::Client

Inherits:
Client
  • Object
show all
Defined in:
lib/zizq/test/client.rb

Overview

A ‘Zizq::Client` stand-in for use in test suites.

Buffers ‘enqueue` / `enqueue_bulk` calls in memory and returns synthetic `Resources::Job` instances (with generated ids) so that callers depending on the regular client’s return contract don’t need to special-case test mode.

Read operations (‘get_queues`, `list_jobs`, `count_jobs`, …) are explicitly not supported in test mode and raise `NotSupported`. Tests that need those should either run against a real server or stub at a higher level.

Activated indirectly via ‘Zizq.configuration.test_mode = true` —`Zizq.client` then lazily builds a `Test::Client` instead of a real `Client`.

Defined Under Namespace

Classes: Entry, NotSupported

Constant Summary collapse

ID_LENGTH =

Length of a real scru128 id in its base-32 representation. Synthetic test ids are sized to match (‘test` prefix + zero padded counter) so they fit anywhere a real id would.

25
ID_PREFIX =
"test"
STATUS_SCHEDULED =

The canonical Zizq lifecycle states. We mirror these so the ‘status` on a buffered job reflects what the real server would report. Test mode never retries — `in_flight` only ever transitions to `completed` or `dead`.

"scheduled"
STATUS_READY =
"ready"
STATUS_IN_FLIGHT =
"in_flight"
STATUS_COMPLETED =
"completed"
STATUS_DEAD =
"dead"
PASS_ALL_FILTER =

Default ‘filter:` lambda — passes every job. Named so the filter pipeline is always callable without nil-checking.

->(_job) { true }

Constants inherited from Client

Client::CONTENT_TYPES, Client::STREAM_ACCEPT

Instance Attribute Summary

Attributes inherited from Client

#format, #url

Instance Method Summary collapse

Methods inherited from Client

#add_cron_group_entry, #cleanup_internal_clients, #count_jobs, #delete_all_jobs, #delete_cron_group, #delete_cron_group_entry, #delete_job, #get_cron_group, #get_cron_group_entry, #get_error, #get_job, #get_path, #get_queues, #health, #list_cron_groups, #list_errors, #list_jobs, make_finalizer, parse_msgpack_stream, parse_ndjson, #replace_cron_group, #replace_cron_group_entry, #report_failure, #report_success, #report_success_bulk, #server_version, #take_jobs, #update_all_jobs, #update_cron_group, #update_cron_group_entry, #update_job

Constructor Details

#initializeClient

: () -> void



56
57
58
59
60
61
62
63
# File 'lib/zizq/test/client.rb', line 56

def initialize #: () -> void
  # Skip the parent's HTTP setup — we don't open connections in
  # test mode. The parent's @http and friends stay nil; methods
  # that would touch them are overridden below.

  @entries = [] #: Array[Entry]
  @mutex = Mutex.new
end

Instance Method Details

#clear!Object

Reset the buffer. Called between tests via ‘Zizq::Test.reset!`.



99
100
101
# File 'lib/zizq/test/client.rb', line 99

def clear! #: () -> void
  @mutex.synchronize { @entries.clear }
end

#closeObject

: () -> void



103
104
# File 'lib/zizq/test/client.rb', line 103

def close #: () -> void
end

#completed_jobs(**filters) ⇒ Object

: (**untyped) -> Array



90
91
92
# File 'lib/zizq/test/client.rb', line 90

def completed_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_COMPLETED], **filters)
end

#dead_jobs(**filters) ⇒ Object

: (**untyped) -> Array



94
95
96
# File 'lib/zizq/test/client.rb', line 94

def dead_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_DEAD], **filters)
end

#drain(**filters) ⇒ Object

Dispatch every runnable entry (status ‘ready`, or `scheduled` with `ready_at` already elapsed) through the configured dequeue middleware chain (same path the real worker uses, so any registered middlewares run in tests too), looping until no more match the filters. Re-enqueues during dispatch fall through the loop naturally — they get drained too unless they fall outside the filter set.

The per-iteration snapshot is taken under the mutex and marked ‘in_flight` atomically. Dispatch happens outside the mutex so handlers can re-enter the client without deadlocking. On success the entry moves to `completed`; on a raised exception it moves to `dead` and the exception re-raises (matching ActiveJob’s ‘perform_enqueued_jobs` + Sidekiq’s ‘drain`).



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/zizq/test/client.rb', line 120

def drain(**filters) #: (**untyped) -> Integer
  total = 0
  loop do
    snapshot = take_runnable_snapshot(**filters)
    break if snapshot.empty?

    snapshot.each { |entry| dispatch_entry(entry) }
    total += snapshot.size
  end
  total
end

#enqueue(queue:, type:, payload:, priority: nil, ready_at: nil, retry_limit: nil, backoff: nil, retention: nil, unique_key: nil, unique_while: nil) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/zizq/test/client.rb', line 133

def enqueue(queue:,
            type:,
            payload:,
            priority: nil,
            ready_at: nil,
            retry_limit: nil,
            backoff: nil,
            retention: nil,
            unique_key: nil,
            unique_while: nil)
  req = EnqueueRequest.new(
    queue:,
    type:,
    payload:,
    priority:,
    ready_at:,
    retry_limit:,
    backoff:,
    retention:,
    unique_key:,
    unique_while:,
  )
  @mutex.synchronize { record_unsynchronized(req) }.job
end

#enqueue_bulk(jobs:) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/zizq/test/client.rb', line 159

def enqueue_bulk(jobs:)
  @mutex.synchronize do
    jobs.map do |params|
      req = EnqueueRequest.new(
        queue:        params[:queue],
        type:         params[:type],
        payload:      params[:payload],
        priority:     params[:priority],
        ready_at:     params[:ready_at],
        retry_limit:  params[:retry_limit],
        backoff:      params[:backoff],
        retention:    params[:retention],
        unique_key:   params[:unique_key],
        unique_while: params[:unique_while],
      )
      record_unsynchronized(req).job
    end
  end
end

#enqueued_jobs(**filters) ⇒ Object

All buffered jobs, in submission order, optionally filtered. See ‘apply_filters` for the filter kwargs.



67
68
69
# File 'lib/zizq/test/client.rb', line 67

def enqueued_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:job) }
end

#enqueued_requests(**filters) ⇒ Object

Original ‘EnqueueRequest`s in submission order. Useful when a test needs metadata that doesn’t survive onto ‘Resources::Job` (`unique_key`, `unique_while`, `delay` before `ready_at` resolution, etc.). Same filter kwargs as `enqueued_jobs`.



75
76
77
# File 'lib/zizq/test/client.rb', line 75

def enqueued_requests(**filters) #: (**untyped) -> Array[EnqueueRequest]
  @mutex.synchronize { apply_filters(@entries, **filters).map(&:request) }
end

#in_flight_jobs(**filters) ⇒ Object

: (**untyped) -> Array



86
87
88
# File 'lib/zizq/test/client.rb', line 86

def in_flight_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_IN_FLIGHT], **filters)
end

#pending_jobs(**filters) ⇒ Object

Jobs awaiting dispatch — ‘ready` and `scheduled` entries. `pending_jobs` is the set `drain` would attempt to run on its next call (modulo `ready_at` for `scheduled` entries).



82
83
84
# File 'lib/zizq/test/client.rb', line 82

def pending_jobs(**filters) #: (**untyped) -> Array[Resources::Job]
  filter_by_status([STATUS_READY, STATUS_SCHEDULED], **filters)
end