Module: Zizq

Defined in:
lib/zizq/error.rb,
lib/zizq.rb,
lib/zizq/job.rb,
lib/zizq/query.rb,
lib/zizq/client.rb,
lib/zizq/worker.rb,
lib/zizq/backoff.rb,
lib/zizq/version.rb,
lib/zizq/lifecycle.rb,
lib/zizq/resources.rb,
lib/zizq/job_config.rb,
lib/zizq/middleware.rb,
lib/zizq/bulk_enqueue.rb,
lib/zizq/enqueue_with.rb,
lib/zizq/ack_processor.rb,
lib/zizq/configuration.rb,
lib/zizq/resources/job.rb,
lib/zizq/resources/page.rb,
lib/zizq/enqueue_request.rb,
lib/zizq/active_job_config.rb,
lib/zizq/resources/job_page.rb,
lib/zizq/resources/resource.rb,
lib/zizq/resources/error_page.rb,
lib/zizq/resources/error_record.rb,
lib/zizq/resources/error_enumerator.rb

Overview

rbs_inline: enabled frozen_string_literal: true

Defined Under Namespace

Modules: ActiveJobConfig, Job, JobConfig, Middleware, RESET, Resources, UNCHANGED Classes: AckProcessor, Backoff, BulkEnqueue, Client, ClientError, Configuration, ConnectionError, EnqueueRequest, EnqueueWith, Error, Lifecycle, NotFoundError, Query, ResponseError, ServerError, StreamError, Worker

Constant Summary collapse

VERSION =

: String

"0.1.0"

Class Method Summary collapse

Class Method Details

.build_enqueue_request(job_class, *args, **kwargs) {|req| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build an EnqueueRequest for a single job class enqueue.

Yields:

  • (req)


255
256
257
258
259
260
261
262
263
264
# File 'lib/zizq.rb', line 255

def build_enqueue_request(job_class, *args, **kwargs, &block)
  unless job_class.is_a?(Class) && job_class < Zizq::Job
    raise ArgumentError, "#{job_class.inspect} must include Zizq::Job"
  end

  zizq_job_class = job_class #: Zizq::job_class
  req = zizq_job_class.zizq_enqueue_request(*args, **kwargs)
  yield req if block_given?
  req
end

.clientObject

Returns a shared client instance built from the global configuration.

The client is memoized so that persistent HTTP connections are reused across calls, reducing TCP connection overhead.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/zizq.rb', line 69

def client #: () -> Client
  @client ||= begin
    @client_mutex.synchronize do
      break @client if @client

      configuration.validate!
      @client = Client.new(
        url: configuration.url,
        format: configuration.format,
        ssl_context: configuration.ssl_context
      )
    end
  end
end

.configurationObject

Returns the client configuration.

The configuration can be updated by calling [‘Zizq::configure`].

This configuration is for the client only. Worker parameters are configured on a per-run basis for flexibility.



46
47
48
# File 'lib/zizq.rb', line 46

def configuration #: () -> Configuration
  @configuration ||= Configuration.new
end

.configureObject

Yields the global configuration ready for updates, which should be done during application initialization, before any jobs are enqueued or worked.

Zizq.configure do |c|
  c.url = "http://localhost:7890"
  c.format = :msgpack
  c.dequeue_middleware.use(MyDequeueMiddleware.new)
end


59
60
61
62
63
# File 'lib/zizq.rb', line 59

def configure #: () { (Configuration) -> void } -> void
  yield configuration
ensure
  @client = nil # shared client is potentially stale
end

.enqueue(job_class, *args, **kwargs, &block) ⇒ Object

Enqueue a job by class with positional and keyword arguments.

By default all arguments are serialized as JSON, which means hashes with symbol keys will become hashes with string keys. The serialization behaviour can be changed by implementing ‘::zizq_serialize` and `::zizq_deserialize` as class methods on the job class.

Default job options can be overridden at enqueue-time by providing a block which receives a mutable ‘Zizq::EnqueueRequest` instance.

Zizq.enqueue(SendEmailJob, 42, template: "welcome")
Zizq.enqueue(SendEmailJob, 42) { |o| o.queue = "priority" }

Job classes may also override ‘::zizq_enqueue_options` to implement dynamically computed options, such as dynamic prioritisation. This class method accepts the same arguments as the `#perform` method and returns an instance of `Zizq::EnqueueRequest`. Any overrides may call `super` and modify the result.

class SendEmailJob
  include Zizq::Job

  zizq_priority 1000

  def self.zizq_enqueue_options(user_id, template:)
    opts = super
    opts.priority /= 2 if template == "welcome"
    opts
  end

  def perform(user_id, template:)
    # ...
  end
end


157
158
159
160
161
# File 'lib/zizq.rb', line 157

def enqueue(job_class, *args, **kwargs, &block)
  req = build_enqueue_request(job_class, *args, **kwargs, &block)
  req = configuration.enqueue_middleware.call(req)
  client.enqueue(**req.to_enqueue_params)
end

.enqueue_bulk {|builder| ... } ⇒ Object

Yields:

  • (builder)


235
236
237
238
239
240
241
242
243
244
245
# File 'lib/zizq.rb', line 235

def enqueue_bulk(&block)
  builder = BulkEnqueue.new
  yield builder
  return [] if builder.requests.empty?

  jobs = builder.requests.map do |req|
    configuration.enqueue_middleware.call(req).to_enqueue_params
  end

  client.enqueue_bulk(jobs:)
end

.enqueue_raw(queue:, type:, payload:, **opts) ⇒ Object

Enqueue a job by providing raw inputs to the Zizq server.

This is for advanced use cases such as enqueueing jobs for consumption in other programming languages.

Zizq.enqueue_raw(
  queue: "emails",
  type: "send_email",
  payload: {user_id: 42, template: "welcome"}
)

If using this method to enqueue a job that is intended for consumption in the Ruby client itself a custom dispatcher implementation is likely required:

Zizq.configure do |c|
  c.dispatcher = MyDispatcher.new
end


193
194
195
196
197
# File 'lib/zizq.rb', line 193

def enqueue_raw(queue:, type:, payload:, **opts)
  req = EnqueueRequest.new(queue:, type:, payload:, **opts)
  req = configuration.enqueue_middleware.call(req)
  client.enqueue(**req.to_enqueue_params)
end

.enqueue_with(**overrides) ⇒ Object

Enqueue multiple jobs atomically in a single bulk request.

This can significantly imprive throughput when many jobs need to be enqueued collectively. There is no upper limit on the number of jobs in the request though generally it is probably wise to keep this to less than 1000 jobs unless you have strong atomicity requuirements for a larger number of jobs..

Yields a builder object whose ‘#enqueue` method accepts the same arguments as `Zizq.enqueue`. All collected jobs are sent as a single `POST /jobs/bulk` request and an array of jobs is returned in the same order as the inputs.

Zizq.enqueue_bulk do |b|
  b.enqueue(ProcessPaymentJob, 7)
  b.enqueue(SendEmailJob, 42, template: "welcome")
  b.enqueue(SendEmailJob, 42) { |o| o.queue = "priority" }
end

Build a scoped enqueue helper that applies the given option overrides to every enqueue routed through it. Equivalent to using the block form of ‘Zizq.enqueue`, but composable and reusable.

Zizq.enqueue_with(ready_at: Time.now + 3600).enqueue(MyJob, 42)
Zizq.enqueue_with(priority: 0).enqueue_bulk { |b| ... }

See ‘Zizq::EnqueueWith` for details.



229
230
231
# File 'lib/zizq.rb', line 229

def enqueue_with(**overrides)
  EnqueueWith.new(self, overrides)
end

.queryObject

Start a query to retrieve or modify job data.



113
114
115
# File 'lib/zizq.rb', line 113

def query(...)
  Query.new(...)
end

.queuesObject

List all distinct queue names on the server.



98
99
100
# File 'lib/zizq.rb', line 98

def queues #: () -> Array[String]
  client.get_queues
end

.reset!Object

Resets all global state: configuration and shared client. Intended for use in tests.



86
87
88
89
90
# File 'lib/zizq.rb', line 86

def reset! #: () -> void
  @client&.close
  @client = nil
  @configuration = nil
end

.server_versionObject

Server version string.



93
94
95
# File 'lib/zizq.rb', line 93

def server_version #: () -> String
  client.server_version
end