Module: ActiveJob::Temporal::PayloadStorage

Extended by:
PayloadStorage
Included in:
PayloadStorage
Defined in:
lib/activejob/temporal/payload_storage.rb

Constant Summary collapse

VERSION =
1
REFERENCE_KEY =
:external_payload_reference

Instance Method Summary collapse

Instance Method Details

#delete(payload, config:) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/activejob/temporal/payload_storage.rb', line 44

def delete(payload, config:)
  return unless external?(payload)

  adapter = storage_adapter(config)
  return unless adapter.respond_to?(:delete)

  adapter.delete(payload_reference(payload))
rescue StandardError => e
  ActiveJob::Temporal::Logger.warn(
    "payload_storage_delete_failed",
    error_class: e.class.name,
    error_message: e.message
  )
end

#external?(payload) ⇒ Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/activejob/temporal/payload_storage.rb', line 15

def external?(payload)
  payload[:external_payload] == true || payload["external_payload"] == true
end

#load(payload, config:, workflow_control_fields:) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/activejob/temporal/payload_storage.rb', line 32

def load(payload, config:, workflow_control_fields:)
  return payload unless external?(payload)

  version = payload[:external_payload_version] || payload["external_payload_version"]
  unless version == VERSION
    raise ActiveJob::SerializationError, "Unsupported external payload version: #{version.inspect}"
  end

  loaded_payload = load_payload(payload, config)
  preserve_workflow_control_fields(payload, loaded_payload, workflow_control_fields)
end

#offload_if_needed(payload, config:, metadata:, workflow_control_fields:) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/activejob/temporal/payload_storage.rb', line 19

def offload_if_needed(payload, config:, metadata:, workflow_control_fields:)
  return payload unless configured?(config)
  return payload unless payload_exceeds_threshold?(payload, config)

  reference = dump_payload(payload, config, )
  envelope = {
    external_payload: true,
    external_payload_version: VERSION,
    REFERENCE_KEY => reference
  }
  preserve_workflow_control_fields(payload, envelope, workflow_control_fields)
end