Module: ActiveJob::Temporal::Payload
Overview
Payload Size Limit Temporal enforces a maximum payload size (configurable via ‘max_payload_size_kb`, default 250 KB). Large payloads will raise a SerializationError with a human-readable message indicating the actual size and the limit. Consider passing database IDs or S3 keys instead of large objects.
GlobalID Serialization ActiveRecord models are automatically serialized using GlobalID. This requires the model to exist in the database at enqueue time and still exist at execution time. If the record is deleted, deserialization will fail.
rubocop:disable Metrics/ModuleLength Payload serialization and deserialization for ActiveJob.
This module converts ActiveJob instances into JSON-serializable hash payloads for transmission to Temporal workflows and activities. It also handles argument deserialization back into Ruby objects.
Constant Summary collapse
- PAYLOAD_WARNING_THRESHOLD =
0.8- PAYLOAD_NEAR_LIMIT_THRESHOLD =
0.9- WORKFLOW_CONTROL_FIELDS =
%i[ scheduled_at default_activity_options retry_policy temporal_options workflow_identity dead_letter rate_limits workflow_interactions child_workflows chain dependencies dependency_failure_policy activity_task_queue continue_as_new workflow_state local_activity_helpers observability schedule_id schedule_workflow_id_prefix schedule_execution_job_id payload_encryption_context ].freeze
Instance Method Summary collapse
- #delete_external_payload(payload, config: ActiveJob::Temporal.config) ⇒ Object
-
#deserialize_args(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Array
Deserializes job arguments from a payload hash.
- #deserialize_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object
- #deserialize_payload_args(payload) ⇒ Object
- #encrypt_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object
- #enforce_size!(payload, metrics_payload: payload, config: ActiveJob::Temporal.config) ⇒ Object
-
#from_job(job, scheduled_at: nil, enforce_size: true, config: ActiveJob::Temporal.config, **options) ⇒ Hash
Converts an ActiveJob instance into a serializable payload hash.
- #offload_payload(payload, metadata:, config: ActiveJob::Temporal.config) ⇒ Object
Instance Method Details
#delete_external_payload(payload, config: ActiveJob::Temporal.config) ⇒ Object
228 229 230 |
# File 'lib/activejob/temporal/payload.rb', line 228 def delete_external_payload(payload, config: ActiveJob::Temporal.config) PayloadStorage.delete(payload, config: config) end |
#deserialize_args(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Array
Deserializes job arguments from a payload hash.
Extracts the serialized arguments array from the payload and uses ActiveJob’s built-in deserialization to reconstruct Ruby objects (including GlobalID references to ActiveRecord models).
190 191 192 |
# File 'lib/activejob/temporal/payload.rb', line 190 def deserialize_args(payload, config: ActiveJob::Temporal.config, encryption_context: nil) deserialize_payload_args(deserialize_payload(payload, config: config, encryption_context: encryption_context)) end |
#deserialize_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/activejob/temporal/payload.rb', line 203 def deserialize_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) stored_payload = PayloadStorage.load( payload, config: config, workflow_control_fields: WORKFLOW_CONTROL_FIELDS ) transport_payload = decrypt_transport_payload(stored_payload, config, encryption_context) execution_payload = serializer_for_transport_payload(transport_payload, config).load(transport_payload) preserve_workflow_control_fields(transport_payload, execution_payload) end |
#deserialize_payload_args(payload) ⇒ Object
194 195 196 197 198 199 200 201 |
# File 'lib/activejob/temporal/payload.rb', line 194 def deserialize_payload_args(payload) serialized_args = payload[:arguments] || payload["arguments"] ActiveJob::Arguments.deserialize(serialized_args) rescue ActiveJob::SerializationError, ActiveJob::Temporal::ConfigurationError raise rescue StandardError => e raise ActiveJob::SerializationError, e. end |
#encrypt_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) ⇒ Object
215 216 217 |
# File 'lib/activejob/temporal/payload.rb', line 215 def encrypt_payload(payload, config: ActiveJob::Temporal.config, encryption_context: nil) encrypt_payload_if_configured(payload, config, encryption_context: encryption_context) end |
#enforce_size!(payload, metrics_payload: payload, config: ActiveJob::Temporal.config) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/activejob/temporal/payload.rb', line 232 def enforce_size!(payload, metrics_payload: payload, config: ActiveJob::Temporal.config) json = JSON.generate(payload) max_size_kb = config.max_payload_size_kb || 250 size_limit_bytes = max_size_kb * 1024 actual_size_kb = json.bytesize / 1024.0 usage_ratio = json.bytesize.to_f / size_limit_bytes Observability.emit( :payload_serialize, Observability.attributes_from_payload(metrics_payload, bytes: json.bytesize) ) log_payload_size(metrics_payload, actual_size_kb, max_size_kb, usage_ratio) return if json.bytesize <= size_limit_bytes = format( "Job payload size (%<actual>.1f KB) exceeds maximum allowed size (%<max>d KB). " \ "Consider reducing argument size or using references (e.g., database IDs).", actual: actual_size_kb, max: max_size_kb ) raise ActiveJob::SerializationError, end |
#from_job(job, scheduled_at: nil, enforce_size: true, config: ActiveJob::Temporal.config, **options) ⇒ Hash
Record Lifecycle Caveat When using GlobalID serialization for ActiveRecord models, the record MUST exist in the database at both enqueue time AND execution time. If the record is deleted before the job executes, deserialization will fail with ActiveRecord::RecordNotFound.
Payload Size Optimization To reduce payload size, prefer passing database IDs instead of full ActiveRecord objects. For example, pass user.id instead of user. This is especially important for jobs with large argument lists or complex nested objects.
Converts an ActiveJob instance into a serializable payload hash.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/activejob/temporal/payload.rb', line 140 def from_job( job, scheduled_at: nil, enforce_size: true, config: ActiveJob::Temporal.config, ** ) encrypt = .fetch(:encrypt, true) offload = .fetch(:offload, enforce_size) encryption_context = [:encryption_context] = [:storage_metadata] payload = payload_from_job_attributes(job) payload[:scheduled_at] = (scheduled_at) if scheduled_at = payload.delete(:scheduled_at) final_payload = serializer_for(config).dump(payload) final_payload[:scheduled_at] = if final_payload = encrypt_payload_for_transport(final_payload, encrypt, config, encryption_context) final_payload = offload_payload_for_transport(final_payload, , config) if offload enforce_size!(final_payload, metrics_payload: payload, config: config) if enforce_size final_payload end |
#offload_payload(payload, metadata:, config: ActiveJob::Temporal.config) ⇒ Object
219 220 221 222 223 224 225 226 |
# File 'lib/activejob/temporal/payload.rb', line 219 def offload_payload(payload, metadata:, config: ActiveJob::Temporal.config) PayloadStorage.offload_if_needed( payload, config: config, metadata: , workflow_control_fields: WORKFLOW_CONTROL_FIELDS ) end |