Module: Zizq::ActiveJobConfig

Includes:
JobConfig
Defined in:
lib/zizq/active_job_config.rb

Overview

Zizq configuration DSL for ActiveJob classes.

Extend this module in an ActiveJob subclass to allow enqueueing jobs via ‘Zizq.enqueue` and to gain access to Zizq features like unique jobs, backoff, and retention:

class SendEmailJob < ApplicationJob
  extend Zizq::ActiveJobConfig

  zizq_unique true, scope: :active
  zizq_backoff exponent: 4.0, base: 15, jitter: 30

  def perform(user_id, template:)
    # ...
  end
end

Serialization uses ActiveJob’s own format so that GlobalID, Time, and other ActiveJob-supported types are handled correctly. The Zizq worker must use the ActiveJob dispatcher:

Zizq.configure do |c|
  c.dispatcher = ActiveJob::QueueAdapters::ZizqAdapter::Dispatcher
end

Instance Method Summary collapse

Methods included from JobConfig

#zizq_backoff, #zizq_enqueue_request, #zizq_priority, #zizq_retention, #zizq_retry_limit, #zizq_unique, #zizq_unique_scope

Instance Method Details

#zizq_deserialize(_payload) ⇒ Object

Deserialization is handled by ActiveJob::Base.execute on the worker side. This method is not used in the ActiveJob dispatch path.

Raises:

  • (NotImplementedError)


66
67
68
69
# File 'lib/zizq/active_job_config.rb', line 66

def zizq_deserialize(_payload) #: (untyped) -> [Array[untyped], Hash[Symbol, untyped]]
  raise NotImplementedError,
    "ActiveJob handles deserialization via ActiveJob::Base.execute"
end

#zizq_payload_filter(*args, **kwargs) ⇒ Object

Generate a jq expression that exactly matches payloads with the given arguments.

This is used for filtering in Zizq::Query.

Generates an expression of the form:

.arguments == ["a","b",{"example":true,"_aj_ruby2_keywords":["example"]}]


88
89
90
91
# File 'lib/zizq/active_job_config.rb', line 88

def zizq_payload_filter(*args, **kwargs) #: (*untyped, **untyped) -> String
  arguments = zizq_serialize(*args, **kwargs)["arguments"]
  ".arguments == #{JSON.generate(arguments)}"
end

#zizq_payload_subset_filter(*args, **kwargs) ⇒ Object

Generate a jq expression that matches jobs whose positional args start with the given values and whose kwargs contain the given key/value pairs.

This is used for filtering in Zizq::Query.

Generates expressions of the form:

(.arguments[0:2] == ["a","b"])

or

(.arguments[0:2] == ["a","b"]) and
(.arguments[-1] | has("_aj_ruby2_keywords")) and
(.arguments[-1] | contains({"example":true}))


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/zizq/active_job_config.rb', line 108

def zizq_payload_subset_filter(*args, **kwargs) #: (*untyped, **untyped) -> String
  arguments = zizq_serialize(*args, **kwargs)["arguments"]

  # ActiveJob flattens arguments into a single array, but marks kwargs with
  # "_aj_ruby2_keywords" => ["key1", "key2", ...] in the last element of
  # the array where kwargs are present. We need to detect this to generate
  # a suitable expression.
  serialized_args, serialized_kwargs =
    if arguments.size > 0
      # See what the last argument looks like. It might be kwargs.
      maybe_kwargs = arguments.pop

      # If it's got "_aj_ruby2_keywords" then it is kwargs.
      if maybe_kwargs.is_a?(Hash) && maybe_kwargs["_aj_ruby2_keywords"]
        # We only want the actual kwargs, not the marker.
        [arguments, maybe_kwargs.except("_aj_ruby2_keywords")]
      else
        # It wasn't kwargs, so put it back.
        [arguments.push(maybe_kwargs), nil]
      end
    else
      [arguments, nil]
    end

  parts = [] #: Array[String]
  parts << %Q<(.arguments[0:#{serialized_args.size}] == #{JSON.generate(serialized_args)})>

  if serialized_kwargs
    parts << %Q<(.arguments[-1] | has("_aj_ruby2_keywords"))>
    parts << %Q<(.arguments[-1] | contains(#{JSON.generate(serialized_kwargs)}))>
  end

  parts.join(" and ")
end

#zizq_queue(name = nil) ⇒ Object

Use ActiveJob’s ‘queue_name` as the default queue, falling back to any explicit `zizq_queue` setting, then “default”.



46
47
48
49
50
51
52
# File 'lib/zizq/active_job_config.rb', line 46

def zizq_queue(name = nil) #: (?String?) -> String
  if name
    super
  else
    @zizq_queue || queue_name || "default"
  end
end

#zizq_serialize(*args, **kwargs) ⇒ Object

Serialize using ActiveJob’s own format.

Creates a temporary ActiveJob instance to produce the canonical serialized form. Returns the full serialized hash (including ‘job_class`, `arguments`, `queue_name`, etc.) so that the payload stored in Zizq matches what `ActiveJob::Base.execute` expects.



60
61
62
# File 'lib/zizq/active_job_config.rb', line 60

def zizq_serialize(*args, **kwargs) #: (*untyped, **untyped) -> Hash[String, untyped]
  new(*args, **kwargs).serialize
end

#zizq_unique_key(*args, **kwargs) ⇒ Object

Override unique key generation to hash only the arguments portion of the serialized payload. The full payload contains volatile fields (job_id, enqueued_at, etc.) that change per instance.



74
75
76
77
78
# File 'lib/zizq/active_job_config.rb', line 74

def zizq_unique_key(*args, **kwargs) #: (*untyped, **untyped) -> String
  arguments = new(*args, **kwargs).serialize["arguments"]
  payload = normalize_payload(arguments)
  "#{name}:#{Digest::SHA256.hexdigest(JSON.generate(payload))}"
end