Class: Journaled::DeliveryJob
Defined Under Namespace
Classes: KinesisTemporaryFailure
Constant Summary
collapse
- DEFAULT_REGION =
'us-east-1'.freeze
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.legacy_computed_stream_name(app_name:) ⇒ Object
32
33
34
35
|
# File 'app/jobs/journaled/delivery_job.rb', line 32
def self.legacy_computed_stream_name(app_name:)
env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_')
ENV.fetch(env_var_name)
end
|
Instance Method Details
#kinesis_client_config ⇒ Object
37
38
39
40
41
42
43
44
45
|
# File 'app/jobs/journaled/delivery_job.rb', line 37
def kinesis_client_config
{
region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
retry_limit: 0,
http_idle_timeout: Journaled.http_idle_timeout,
http_open_timeout: Journaled.http_open_timeout,
http_read_timeout: Journaled.http_read_timeout,
}.merge(credentials)
end
|
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'app/jobs/journaled/delivery_job.rb', line 18
def perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED)
@serialized_event = serialized_event
@partition_key = partition_key
if app_name != UNSPECIFIED
@stream_name = self.class.legacy_computed_stream_name(app_name: app_name)
elsif stream_name != UNSPECIFIED && !stream_name.nil?
@stream_name = stream_name
else
raise(ArgumentError, 'missing keyword: stream_name')
end
journal!
end
|