Class: Journaled::DeliveryJob

Inherits:
ApplicationJob show all
Defined in:
app/jobs/journaled/delivery_job.rb

Defined Under Namespace

Classes: KinesisTemporaryFailure

Constant Summary collapse

DEFAULT_REGION =
'us-east-1'.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.legacy_computed_stream_name(app_name:) ⇒ Object



32
33
34
35
# File 'app/jobs/journaled/delivery_job.rb', line 32

def self.legacy_computed_stream_name(app_name:)
  env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_')
  ENV.fetch(env_var_name)
end

Instance Method Details

#kinesis_client_configObject



37
38
39
40
41
42
43
44
45
# File 'app/jobs/journaled/delivery_job.rb', line 37

def kinesis_client_config
  {
    region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
    retry_limit: 0,
    http_idle_timeout: Journaled.http_idle_timeout,
    http_open_timeout: Journaled.http_open_timeout,
    http_read_timeout: Journaled.http_read_timeout,
  }.merge(credentials)
end

#perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'app/jobs/journaled/delivery_job.rb', line 18

def perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED)
  @serialized_event = serialized_event
  @partition_key = partition_key
  if app_name != UNSPECIFIED
    @stream_name = self.class.legacy_computed_stream_name(app_name: app_name)
  elsif stream_name != UNSPECIFIED && !stream_name.nil?
    @stream_name = stream_name
  else
    raise(ArgumentError, 'missing keyword: stream_name')
  end

  journal!
end