Module: Zizq
- Defined in:
- lib/zizq/error.rb,
lib/zizq.rb,
lib/zizq/job.rb,
lib/zizq/query.rb,
lib/zizq/client.rb,
lib/zizq/worker.rb,
lib/zizq/backoff.rb,
lib/zizq/version.rb,
lib/zizq/lifecycle.rb,
lib/zizq/resources.rb,
lib/zizq/job_config.rb,
lib/zizq/middleware.rb,
lib/zizq/bulk_enqueue.rb,
lib/zizq/enqueue_with.rb,
lib/zizq/ack_processor.rb,
lib/zizq/configuration.rb,
lib/zizq/resources/job.rb,
lib/zizq/resources/page.rb,
lib/zizq/enqueue_request.rb,
lib/zizq/active_job_config.rb,
lib/zizq/resources/job_page.rb,
lib/zizq/resources/resource.rb,
lib/zizq/resources/error_page.rb,
lib/zizq/resources/error_record.rb,
lib/zizq/resources/error_enumerator.rb
Overview
rbs_inline: enabled frozen_string_literal: true
Defined Under Namespace
Modules: ActiveJobConfig, Job, JobConfig, Middleware, RESET, Resources, UNCHANGED Classes: AckProcessor, Backoff, BulkEnqueue, Client, ClientError, Configuration, ConnectionError, EnqueueRequest, EnqueueWith, Error, Lifecycle, NotFoundError, Query, ResponseError, ServerError, StreamError, Worker
Constant Summary collapse
- VERSION =
: String
"0.1.0"
Class Method Summary collapse
-
.build_enqueue_request(job_class, *args, **kwargs) {|req| ... } ⇒ Object
private
Build an EnqueueRequest for a single job class enqueue.
-
.client ⇒ Object
Returns a shared client instance built from the global configuration.
-
.configuration ⇒ Object
Returns the client configuration.
-
.configure ⇒ Object
Yields the global configuration ready for updates, which should be done during application initialization, before any jobs are enqueued or worked.
-
.enqueue(job_class, *args, **kwargs, &block) ⇒ Object
Enqueue a job by class with positional and keyword arguments.
- .enqueue_bulk {|builder| ... } ⇒ Object
-
.enqueue_raw(queue:, type:, payload:, **opts) ⇒ Object
Enqueue a job by providing raw inputs to the Zizq server.
-
.enqueue_with(**overrides) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
-
.query ⇒ Object
Start a query to retrieve or modify job data.
-
.queues ⇒ Object
List all distinct queue names on the server.
-
.reset! ⇒ Object
Resets all global state: configuration and shared client.
-
.server_version ⇒ Object
Server version string.
Class Method Details
.build_enqueue_request(job_class, *args, **kwargs) {|req| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Build an EnqueueRequest for a single job class enqueue.
255 256 257 258 259 260 261 262 263 264 |
# File 'lib/zizq.rb', line 255 def build_enqueue_request(job_class, *args, **kwargs, &block) unless job_class.is_a?(Class) && job_class < Zizq::Job raise ArgumentError, "#{job_class.inspect} must include Zizq::Job" end zizq_job_class = job_class #: Zizq::job_class req = zizq_job_class.zizq_enqueue_request(*args, **kwargs) yield req if block_given? req end |
.client ⇒ Object
Returns a shared client instance built from the global configuration.
The client is memoized so that persistent HTTP connections are reused across calls, reducing TCP connection overhead.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/zizq.rb', line 69 def client #: () -> Client @client ||= begin @client_mutex.synchronize do break @client if @client configuration.validate! @client = Client.new( url: configuration.url, format: configuration.format, ssl_context: configuration.ssl_context ) end end end |
.configuration ⇒ Object
Returns the client configuration.
The configuration can be updated by calling [‘Zizq::configure`].
This configuration is for the client only. Worker parameters are configured on a per-run basis for flexibility.
46 47 48 |
# File 'lib/zizq.rb', line 46 def configuration #: () -> Configuration @configuration ||= Configuration.new end |
.configure ⇒ Object
Yields the global configuration ready for updates, which should be done during application initialization, before any jobs are enqueued or worked.
Zizq.configure do |c|
c.url = "http://localhost:7890"
c.format = :msgpack
c.dequeue_middleware.use(MyDequeueMiddleware.new)
end
59 60 61 62 63 |
# File 'lib/zizq.rb', line 59 def configure #: () { (Configuration) -> void } -> void yield configuration ensure @client = nil # shared client is potentially stale end |
.enqueue(job_class, *args, **kwargs, &block) ⇒ Object
Enqueue a job by class with positional and keyword arguments.
By default all arguments are serialized as JSON, which means hashes with symbol keys will become hashes with string keys. The serialization behaviour can be changed by implementing ‘::zizq_serialize` and `::zizq_deserialize` as class methods on the job class.
Default job options can be overridden at enqueue-time by providing a block which receives a mutable ‘Zizq::EnqueueRequest` instance.
Zizq.enqueue(SendEmailJob, 42, template: "welcome")
Zizq.enqueue(SendEmailJob, 42) { |o| o.queue = "priority" }
Job classes may also override ‘::zizq_enqueue_options` to implement dynamically computed options, such as dynamic prioritisation. This class method accepts the same arguments as the `#perform` method and returns an instance of `Zizq::EnqueueRequest`. Any overrides may call `super` and modify the result.
class SendEmailJob
include Zizq::Job
zizq_priority 1000
def self.(user_id, template:)
opts = super
opts.priority /= 2 if template == "welcome"
opts
end
def perform(user_id, template:)
# ...
end
end
157 158 159 160 161 |
# File 'lib/zizq.rb', line 157 def enqueue(job_class, *args, **kwargs, &block) req = build_enqueue_request(job_class, *args, **kwargs, &block) req = configuration.enqueue_middleware.call(req) client.enqueue(**req.to_enqueue_params) end |
.enqueue_bulk {|builder| ... } ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/zizq.rb', line 235 def enqueue_bulk(&block) builder = BulkEnqueue.new yield builder return [] if builder.requests.empty? jobs = builder.requests.map do |req| configuration.enqueue_middleware.call(req).to_enqueue_params end client.enqueue_bulk(jobs:) end |
.enqueue_raw(queue:, type:, payload:, **opts) ⇒ Object
Enqueue a job by providing raw inputs to the Zizq server.
This is for advanced use cases such as enqueueing jobs for consumption in other programming languages.
Zizq.enqueue_raw(
queue: "emails",
type: "send_email",
payload: {user_id: 42, template: "welcome"}
)
If using this method to enqueue a job that is intended for consumption in the Ruby client itself a custom dispatcher implementation is likely required:
Zizq.configure do |c|
c.dispatcher = MyDispatcher.new
end
193 194 195 196 197 |
# File 'lib/zizq.rb', line 193 def enqueue_raw(queue:, type:, payload:, **opts) req = EnqueueRequest.new(queue:, type:, payload:, **opts) req = configuration.enqueue_middleware.call(req) client.enqueue(**req.to_enqueue_params) end |
.enqueue_with(**overrides) ⇒ Object
Enqueue multiple jobs atomically in a single bulk request.
This can significantly imprive throughput when many jobs need to be enqueued collectively. There is no upper limit on the number of jobs in the request though generally it is probably wise to keep this to less than 1000 jobs unless you have strong atomicity requuirements for a larger number of jobs..
Yields a builder object whose ‘#enqueue` method accepts the same arguments as `Zizq.enqueue`. All collected jobs are sent as a single `POST /jobs/bulk` request and an array of jobs is returned in the same order as the inputs.
Zizq.enqueue_bulk do |b|
b.enqueue(ProcessPaymentJob, 7)
b.enqueue(SendEmailJob, 42, template: "welcome")
b.enqueue(SendEmailJob, 42) { |o| o.queue = "priority" }
end
Build a scoped enqueue helper that applies the given option overrides to every enqueue routed through it. Equivalent to using the block form of ‘Zizq.enqueue`, but composable and reusable.
Zizq.enqueue_with(ready_at: Time.now + 3600).enqueue(MyJob, 42)
Zizq.enqueue_with(priority: 0).enqueue_bulk { |b| ... }
See ‘Zizq::EnqueueWith` for details.
229 230 231 |
# File 'lib/zizq.rb', line 229 def enqueue_with(**overrides) EnqueueWith.new(self, overrides) end |
.query ⇒ Object
Start a query to retrieve or modify job data.
113 114 115 |
# File 'lib/zizq.rb', line 113 def query(...) Query.new(...) end |
.queues ⇒ Object
List all distinct queue names on the server.
98 99 100 |
# File 'lib/zizq.rb', line 98 def queues #: () -> Array[String] client.get_queues end |
.reset! ⇒ Object
Resets all global state: configuration and shared client. Intended for use in tests.
86 87 88 89 90 |
# File 'lib/zizq.rb', line 86 def reset! #: () -> void @client&.close @client = nil @configuration = nil end |
.server_version ⇒ Object
Server version string.
93 94 95 |
# File 'lib/zizq.rb', line 93 def server_version #: () -> String client.server_version end |