Module: Zizq::ActiveJobConfig
- Includes:
- JobConfig
- Defined in:
- lib/zizq/active_job_config.rb
Overview
Zizq configuration DSL for ActiveJob classes.
Extend this module in an ActiveJob subclass to allow enqueueing jobs via ‘Zizq.enqueue` and to gain access to Zizq features like unique jobs, backoff, and retention:
class SendEmailJob < ApplicationJob
extend Zizq::ActiveJobConfig
zizq_unique true, scope: :active
zizq_backoff exponent: 4.0, base: 15, jitter: 30
def perform(user_id, template:)
# ...
end
end
Serialization uses ActiveJob’s own format so that GlobalID, Time, and other ActiveJob-supported types are handled correctly. The Zizq worker must use the ActiveJob dispatcher:
Zizq.configure do |c|
c.dispatcher = ActiveJob::QueueAdapters::ZizqAdapter::Dispatcher
end
Instance Method Summary collapse
-
#zizq_deserialize(_payload) ⇒ Object
Deserialization is handled by ActiveJob::Base.execute on the worker side.
-
#zizq_payload_filter(*args, **kwargs) ⇒ Object
Generate a jq expression that exactly matches payloads with the given arguments.
-
#zizq_payload_subset_filter(*args, **kwargs) ⇒ Object
Generate a jq expression that matches jobs whose positional args start with the given values and whose kwargs contain the given key/value pairs.
-
#zizq_queue(name = nil) ⇒ Object
Use ActiveJob’s ‘queue_name` as the default queue, falling back to any explicit `zizq_queue` setting, then “default”.
-
#zizq_serialize(*args, **kwargs) ⇒ Object
Serialize using ActiveJob’s own format.
-
#zizq_unique_key(*args, **kwargs) ⇒ Object
Override unique key generation to hash only the arguments portion of the serialized payload.
Methods included from JobConfig
#zizq_backoff, #zizq_enqueue_request, #zizq_priority, #zizq_retention, #zizq_retry_limit, #zizq_unique, #zizq_unique_scope
Instance Method Details
#zizq_deserialize(_payload) ⇒ Object
Deserialization is handled by ActiveJob::Base.execute on the worker side. This method is not used in the ActiveJob dispatch path.
66 67 68 69 |
# File 'lib/zizq/active_job_config.rb', line 66 def zizq_deserialize(_payload) #: (untyped) -> [Array[untyped], Hash[Symbol, untyped]] raise NotImplementedError, "ActiveJob handles deserialization via ActiveJob::Base.execute" end |
#zizq_payload_filter(*args, **kwargs) ⇒ Object
Generate a jq expression that exactly matches payloads with the given arguments.
This is used for filtering in Zizq::Query.
Generates an expression of the form:
.arguments == ["a","b",{"example":true,"_aj_ruby2_keywords":["example"]}]
88 89 90 91 |
# File 'lib/zizq/active_job_config.rb', line 88 def zizq_payload_filter(*args, **kwargs) #: (*untyped, **untyped) -> String arguments = zizq_serialize(*args, **kwargs)["arguments"] ".arguments == #{JSON.generate(arguments)}" end |
#zizq_payload_subset_filter(*args, **kwargs) ⇒ Object
Generate a jq expression that matches jobs whose positional args start with the given values and whose kwargs contain the given key/value pairs.
This is used for filtering in Zizq::Query.
Generates expressions of the form:
(.arguments[0:2] == ["a","b"])
or
(.arguments[0:2] == ["a","b"]) and
(.arguments[-1] | has("_aj_ruby2_keywords")) and
(.arguments[-1] | contains({"example":true}))
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/zizq/active_job_config.rb', line 108 def zizq_payload_subset_filter(*args, **kwargs) #: (*untyped, **untyped) -> String arguments = zizq_serialize(*args, **kwargs)["arguments"] # ActiveJob flattens arguments into a single array, but marks kwargs with # "_aj_ruby2_keywords" => ["key1", "key2", ...] in the last element of # the array where kwargs are present. We need to detect this to generate # a suitable expression. serialized_args, serialized_kwargs = if arguments.size > 0 # See what the last argument looks like. It might be kwargs. maybe_kwargs = arguments.pop # If it's got "_aj_ruby2_keywords" then it is kwargs. if maybe_kwargs.is_a?(Hash) && maybe_kwargs["_aj_ruby2_keywords"] # We only want the actual kwargs, not the marker. [arguments, maybe_kwargs.except("_aj_ruby2_keywords")] else # It wasn't kwargs, so put it back. [arguments.push(maybe_kwargs), nil] end else [arguments, nil] end parts = [] #: Array[String] parts << %Q<(.arguments[0:#{serialized_args.size}] == #{JSON.generate(serialized_args)})> if serialized_kwargs parts << %Q<(.arguments[-1] | has("_aj_ruby2_keywords"))> parts << %Q<(.arguments[-1] | contains(#{JSON.generate(serialized_kwargs)}))> end parts.join(" and ") end |
#zizq_queue(name = nil) ⇒ Object
Use ActiveJob’s ‘queue_name` as the default queue, falling back to any explicit `zizq_queue` setting, then “default”.
46 47 48 49 50 51 52 |
# File 'lib/zizq/active_job_config.rb', line 46 def zizq_queue(name = nil) #: (?String?) -> String if name super else @zizq_queue || queue_name || "default" end end |
#zizq_serialize(*args, **kwargs) ⇒ Object
Serialize using ActiveJob’s own format.
Creates a temporary ActiveJob instance to produce the canonical serialized form. Returns the full serialized hash (including ‘job_class`, `arguments`, `queue_name`, etc.) so that the payload stored in Zizq matches what `ActiveJob::Base.execute` expects.
60 61 62 |
# File 'lib/zizq/active_job_config.rb', line 60 def zizq_serialize(*args, **kwargs) #: (*untyped, **untyped) -> Hash[String, untyped] new(*args, **kwargs).serialize end |
#zizq_unique_key(*args, **kwargs) ⇒ Object
Override unique key generation to hash only the arguments portion of the serialized payload. The full payload contains volatile fields (job_id, enqueued_at, etc.) that change per instance.
74 75 76 77 78 |
# File 'lib/zizq/active_job_config.rb', line 74 def zizq_unique_key(*args, **kwargs) #: (*untyped, **untyped) -> String arguments = new(*args, **kwargs).serialize["arguments"] payload = normalize_payload(arguments) "#{name}:#{Digest::SHA256.hexdigest(JSON.generate(payload))}" end |