Module: ActiveJob::Temporal::PayloadStorage
- Extended by:
- PayloadStorage
- Included in:
- PayloadStorage
- Defined in:
- lib/activejob/temporal/payload_storage.rb
Constant Summary collapse
- VERSION =
1- REFERENCE_KEY =
:external_payload_reference
Instance Method Summary collapse
- #delete(payload, config:) ⇒ Object
- #external?(payload) ⇒ Boolean
- #load(payload, config:, workflow_control_fields:) ⇒ Object
- #offload_if_needed(payload, config:, metadata:, workflow_control_fields:) ⇒ Object
Instance Method Details
#delete(payload, config:) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/activejob/temporal/payload_storage.rb', line 44 def delete(payload, config:) return unless external?(payload) adapter = storage_adapter(config) return unless adapter.respond_to?(:delete) adapter.delete(payload_reference(payload)) rescue StandardError => e ActiveJob::Temporal::Logger.warn( "payload_storage_delete_failed", error_class: e.class.name, error_message: e. ) end |
#external?(payload) ⇒ Boolean
15 16 17 |
# File 'lib/activejob/temporal/payload_storage.rb', line 15 def external?(payload) payload[:external_payload] == true || payload["external_payload"] == true end |
#load(payload, config:, workflow_control_fields:) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/activejob/temporal/payload_storage.rb', line 32 def load(payload, config:, workflow_control_fields:) return payload unless external?(payload) version = payload[:external_payload_version] || payload["external_payload_version"] unless version == VERSION raise ActiveJob::SerializationError, "Unsupported external payload version: #{version.inspect}" end loaded_payload = load_payload(payload, config) preserve_workflow_control_fields(payload, loaded_payload, workflow_control_fields) end |
#offload_if_needed(payload, config:, metadata:, workflow_control_fields:) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/activejob/temporal/payload_storage.rb', line 19 def offload_if_needed(payload, config:, metadata:, workflow_control_fields:) return payload unless configured?(config) return payload unless payload_exceeds_threshold?(payload, config) reference = dump_payload(payload, config, ) envelope = { external_payload: true, external_payload_version: VERSION, REFERENCE_KEY => reference } preserve_workflow_control_fields(payload, envelope, workflow_control_fields) end |