Module: ActiveJob::Temporal::Payload

Extended by:
Payload
Included in:
Payload
Defined in:
lib/activejob/temporal/payload.rb

Overview

Note:

Payload Size Limit Temporal enforces a maximum payload size (configurable via ‘max_payload_size_kb`, default 250 KB). Large payloads will raise a SerializationError with a human-readable message indicating the actual size and the limit. Consider passing database IDs or S3 keys instead of large objects.

Note:

GlobalID Serialization ActiveRecord models are automatically serialized using GlobalID. This requires the model to exist in the database at enqueue time and still exist at execution time. If the record is deleted, deserialization will fail.

rubocop:disable Metrics/ModuleLength Payload serialization and deserialization for ActiveJob.

This module converts ActiveJob instances into JSON-serializable hash payloads for transmission to Temporal workflows and activities. It also handles argument deserialization back into Ruby objects.

Examples:

Payload structure

{
  job_class: "MyJob",
  job_id: "abc-123",
  queue_name: "default",
  arguments: [{"_aj_serialized"=>"ActiveJob::Serializers::ObjectSerializer", ...}],
  executions: 0,
  exception_executions: {},
  scheduled_at: "2025-10-29T12:00:00Z" # optional
}

See Also:

Constant Summary collapse

PAYLOAD_WARNING_THRESHOLD =
0.8
PAYLOAD_NEAR_LIMIT_THRESHOLD =
0.9
WORKFLOW_CONTROL_FIELDS =
%i[
  scheduled_at
  default_activity_options
  retry_policy
  temporal_options
  workflow_identity
  dead_letter
  rate_limits
  workflow_interactions
  child_workflows
  chain
  dependencies
  dependency_failure_policy
  activity_task_queue
  continue_as_new
  workflow_state
  local_activity_helpers
  observability
  schedule_id
  schedule_workflow_id_prefix
  schedule_execution_job_id
  payload_encryption_context
].freeze

Instance Method Summary collapse

Instance Method Details

#delete_external_payload(payload, config: ActiveJob::Temporal.config) ⇒ Object



228
229
230
# File 'lib/activejob/temporal/payload.rb', line 228

def delete_external_payload(payload, config: ActiveJob::Temporal.config)
  PayloadStorage.delete(payload, config: config)
end

#deserialize_args(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Array

Deserializes job arguments from a payload hash.

Extracts the serialized arguments array from the payload and uses ActiveJob’s built-in deserialization to reconstruct Ruby objects (including GlobalID references to ActiveRecord models).

Examples:

Deserialize arguments

payload = { arguments: [{"_aj_serialized"=>"..."}] }
args = Payload.deserialize_args(payload)
# => [actual_ruby_object]

GlobalID deserialization with deleted record

begin
  payload = { arguments: [{"_aj_globalid"=>"gid://app/User/999"}] }
  args = Payload.deserialize_args(payload)
rescue ActiveRecord::RecordNotFound => e
  # Record was deleted between enqueue and execution
  Rails.logger.warn("Job argument no longer exists: #{e.message}")
end

Parameters:

  • payload (Hash)

    Payload hash containing serialized arguments

Options Hash (payload):

  • :arguments (Array)

    Serialized arguments (required)

Returns:

  • (Array)

    Deserialized arguments array ready for job.perform(*args)

Raises:

  • (ActiveJob::SerializationError)

    if deserialization fails

  • (GlobalID::RecordNotFound)

    if a GlobalID reference points to a deleted record



190
191
192
# File 'lib/activejob/temporal/payload.rb', line 190

def deserialize_args(payload, config: ActiveJob::Temporal.config, encryption_context: nil)
  deserialize_payload_args(deserialize_payload(payload, config: config, encryption_context: encryption_context))
end

#deserialize_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
# File 'lib/activejob/temporal/payload.rb', line 203

def deserialize_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil)
  stored_payload = PayloadStorage.load(
    payload,
    config: config,
    workflow_control_fields: WORKFLOW_CONTROL_FIELDS
  )
  transport_payload = decrypt_transport_payload(stored_payload, config, encryption_context)

  execution_payload = serializer_for_transport_payload(transport_payload, config).load(transport_payload)
  preserve_workflow_control_fields(transport_payload, execution_payload)
end

#deserialize_payload_args(payload) ⇒ Object



194
195
196
197
198
199
200
201
# File 'lib/activejob/temporal/payload.rb', line 194

def deserialize_payload_args(payload)
  serialized_args = payload[:arguments] || payload["arguments"]
  ActiveJob::Arguments.deserialize(serialized_args)
rescue ActiveJob::SerializationError, ActiveJob::Temporal::ConfigurationError
  raise
rescue StandardError => e
  raise ActiveJob::SerializationError, e.message
end

#encrypt_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object



215
216
217
# File 'lib/activejob/temporal/payload.rb', line 215

def encrypt_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil)
  encrypt_payload_if_configured(payload, config, encryption_context: encryption_context)
end

#enforce_size!(payload, metrics_payload: payload, config: ActiveJob::Temporal.config) ⇒ Object

Raises:

  • (ActiveJob::SerializationError)


232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/activejob/temporal/payload.rb', line 232

def enforce_size!(payload, metrics_payload: payload, config: ActiveJob::Temporal.config)
  json = JSON.generate(payload)
  max_size_kb = config.max_payload_size_kb || 250
  size_limit_bytes = max_size_kb * 1024
  actual_size_kb = json.bytesize / 1024.0
  usage_ratio = json.bytesize.to_f / size_limit_bytes

  Observability.emit(
    :payload_serialize,
    Observability.attributes_from_payload(metrics_payload, bytes: json.bytesize)
  )
  log_payload_size(metrics_payload, actual_size_kb, max_size_kb, usage_ratio)
  return if json.bytesize <= size_limit_bytes

  message = format(
    "Job payload size (%<actual>.1f KB) exceeds maximum allowed size (%<max>d KB). " \
    "Consider reducing argument size or using references (e.g., database IDs).",
    actual: actual_size_kb,
    max: max_size_kb
  )
  raise ActiveJob::SerializationError, message
end

#from_job(job, scheduled_at: nil, enforce_size: true, config: ActiveJob::Temporal.config, **options) ⇒ Hash

Note:

Record Lifecycle Caveat When using GlobalID serialization for ActiveRecord models, the record MUST exist in the database at both enqueue time AND execution time. If the record is deleted before the job executes, deserialization will fail with ActiveRecord::RecordNotFound.

Note:

Payload Size Optimization To reduce payload size, prefer passing database IDs instead of full ActiveRecord objects. For example, pass user.id instead of user. This is especially important for jobs with large argument lists or complex nested objects.

Converts an ActiveJob instance into a serializable payload hash.

Examples:

Basic job payload

job = MyJob.new
payload = Payload.from_job(job)
# => { job_class: "MyJob", job_id: "...", arguments: [...], ... }

Scheduled job payload

job = MyJob.new
payload = Payload.from_job(job, scheduled_at: 1.hour.from_now)
# => { ..., scheduled_at: "2025-10-29T13:00:00Z" }

Handling payload size errors

begin
  MyJob.perform_later(large_object)
rescue ActiveJob::SerializationError => e
  Rails.logger.error("Payload too large: #{e.message}")
  # Recommendation: Pass ID instead of full object
  MyJob.perform_later(large_object.id)
end

GlobalID serialization (ActiveRecord models)

user = User.find(123)
MyJob.perform_later(user)  # Serializes as GlobalID
# Payload contains: { "_aj_globalid" => "gid://app/User/123" }

Non-serializable object error

begin
  MyJob.perform_later(File.open("/tmp/file.txt"))
rescue ActiveJob::SerializationError => e
  # => "Unsupported argument type: File"
  Rails.logger.error(e.message)
end

Parameters:

  • job (ActiveJob::Base)

    The job instance to serialize

  • scheduled_at (Time, String, nil) (defaults to: nil)

    Optional scheduled execution time

Returns:

  • (Hash)

    Serialized payload with keys:

    • :job_class [String] Fully-qualified job class name

    • :job_id [String] Unique job identifier

    • :queue_name [String] Target queue name

    • :arguments [Array] Serialized job arguments (via ActiveJob::Arguments)

    • :executions [Integer] Current execution count (default 0)

    • :exception_executions [Hash] Exception execution counts (default {})

    • :scheduled_at [String] ISO8601 timestamp (optional)

Raises:

  • (ActiveJob::SerializationError)

    if arguments cannot be serialized

  • (ActiveJob::SerializationError)

    if payload exceeds max_payload_size_kb (includes actual size in message)

  • (ArgumentError)

    if scheduled_at is not convertible to Time

  • (ArgumentError)

    if job is nil

  • (NoMethodError)

    if job does not respond to required attributes

  • (JSON::GeneratorError)

    if payload cannot be JSON-serialized



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/activejob/temporal/payload.rb', line 140

def from_job(
  job,
  scheduled_at: nil,
  enforce_size: true,
  config: ActiveJob::Temporal.config,
  **options
)
  encrypt = options.fetch(:encrypt, true)
  offload = options.fetch(:offload, enforce_size)
  encryption_context = options[:encryption_context]
   = options[:storage_metadata]
  payload = payload_from_job_attributes(job)
  payload[:scheduled_at] = iso8601_timestamp(scheduled_at) if scheduled_at

  scheduled_timestamp = payload.delete(:scheduled_at)
  final_payload = serializer_for(config).dump(payload)
  final_payload[:scheduled_at] = scheduled_timestamp if scheduled_timestamp
  final_payload = encrypt_payload_for_transport(final_payload, encrypt, config, encryption_context)
  final_payload = offload_payload_for_transport(final_payload, , config) if offload
  enforce_size!(final_payload, metrics_payload: payload, config: config) if enforce_size
  final_payload
end

#offload_payload(payload, metadata:, config: ActiveJob::Temporal.config) ⇒ Object



219
220
221
222
223
224
225
226
# File 'lib/activejob/temporal/payload.rb', line 219

def offload_payload(payload, metadata:, config: ActiveJob::Temporal.config)
  PayloadStorage.offload_if_needed(
    payload,
    config: config,
    metadata: ,
    workflow_control_fields: WORKFLOW_CONTROL_FIELDS
  )
end