Class: CycloneLariat::Clients::Sqs
Instance Attribute Summary
Attributes inherited from Abstract
#config
Instance Method Summary
collapse
-
#add_policy(queue:, policy:) ⇒ Object
-
#create(queue) ⇒ Object
-
#delete(queue) ⇒ Object
-
#exists?(queue) ⇒ Boolean
-
#list_all ⇒ Object
-
#publish(msg, fifo:, dest: nil, queue: nil, skip_validation: false) ⇒ Object
-
#publish_command(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object
-
#publish_event(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object
#custom_queue, #queue
Methods inherited from Abstract
#initialize
#command, #command_v1, #command_v2
#event, #event_v1, #event_v2
Instance Method Details
#add_policy(queue:, policy:) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 25
def add_policy(queue:, policy:)
current_policy_json = aws_client.get_queue_attributes({
queue_url: queue.url,
attribute_names: ['Policy']
}).attributes['Policy']
current_policy = JSON.parse(current_policy_json) if current_policy_json
return if current_policy && current_policy['Statement'].find { |s| s['Sid'] == policy['Sid'] }
new_policy = current_policy || { 'Statement' => [] }
new_policy['Statement'] << policy
aws_client.set_queue_attributes({ queue_url: queue.url, attributes: { 'Policy' => new_policy.to_json } })
end
|
#create(queue) ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 78
def create(queue)
raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue
raise Errors::QueueAlreadyExists.new(expected_queue: queue.name) if exists?(queue)
aws_client.create_queue(queue_name: queue.name, attributes: queue.attributes, tags: queue.tags)
queue
end
|
#delete(queue) ⇒ Object
86
87
88
89
90
91
92
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 86
def delete(queue)
raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue
raise Errors::QueueDoesNotExists.new(expected_queue: queue.name) unless exists?(queue)
aws_client.delete_queue queue_url: queue.url
queue
end
|
#exists?(queue) ⇒ Boolean
17
18
19
20
21
22
23
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 17
def exists?(queue)
raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue
aws_client.get_queue_url(queue_name: queue.to_s) && true
rescue Aws::SQS::Errors::NonExistentQueue
false
end
|
#list_all ⇒ Object
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 94
def list_all
queues = []
resp = aws_client.list_queues
loop do
next_token = resp[:next_token]
resp[:queue_urls].map do |url|
queues << Resources::Queue.from_url(url)
end
break if next_token.nil?
resp = aws_client.list_queues(next_token: next_token)
end
queues
end
|
#publish(msg, fifo:, dest: nil, queue: nil, skip_validation: false) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 41
def publish(msg, fifo:, dest: nil, queue: nil, skip_validation: false)
return Fake.sqs_send_message_result(msg) if config.fake_publish
queue = queue ? custom_queue(queue) : queue(msg.type, kind: msg.kind, fifo: fifo, dest: dest)
raise Errors::GroupIdUndefined.new(resource: queue) if fifo && msg.group_id.nil?
raise Errors::GroupDefined.new(resource: queue) if !fifo && msg.group_id
raise Errors::DeduplicationIdDefined.new(resource: queue) if !fifo && msg.deduplication_id
msg.validation.check! unless skip_validation
params = {
queue_url: queue.url,
message_body: msg.to_json,
message_group_id: msg.group_id,
message_deduplication_id: msg.deduplication_id
}.compact
aws_client.send_message(**params)
end
|
#publish_command(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object
70
71
72
73
74
75
76
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 70
def publish_command(type, fifo:, dest: nil, queue: nil, **options)
options[:version] ||= self.config.version
options[:data] ||= {}
options[:uuid] ||= SecureRandom.uuid
publish event(type, data: data, **options), fifo: fifo, dest: dest, queue: queue
end
|
#publish_event(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object
62
63
64
65
66
67
68
|
# File 'lib/cyclone_lariat/clients/sqs.rb', line 62
def publish_event(type, fifo:, dest: nil, queue: nil, **options)
options[:version] ||= self.config.version
options[:data] ||= {}
options[:uuid] ||= SecureRandom.uuid
publish event(type, data: data, **options), fifo: fifo, dest: dest, queue: queue
end
|