Class: CycloneLariat::Clients::Sns
Instance Attribute Summary
Attributes inherited from Abstract
#config
Instance Method Summary
collapse
-
#create(topic) ⇒ Object
-
#delete(topic) ⇒ Object
-
#exists?(topic) ⇒ Boolean
-
#find_subscription_arn(topic:, endpoint:) ⇒ Object
-
#list_all ⇒ Object
-
#list_subscriptions ⇒ Object
-
#publish(msg, fifo:, topic: nil, skip_validation: false) ⇒ Object
-
#publish_command(type, fifo:, topic: nil, **options) ⇒ Object
-
#publish_event(type, fifo:, topic: nil, **options) ⇒ Object
-
#subscribe(topic:, endpoint:) ⇒ Object
-
#subscribed?(topic:, endpoint:) ⇒ Boolean
-
#topic_subscriptions(topic) ⇒ Object
-
#unsubscribe(topic:, endpoint:) ⇒ Object
#custom_topic, #topic
Methods inherited from Abstract
#initialize
#command, #command_v1, #command_v2
#event, #event_v1, #event_v2
Instance Method Details
#create(topic) ⇒ Object
62
63
64
65
66
67
68
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 62
def create(topic)
raise ArgumentError, 'Should be Resources::Topic' unless topic.is_a? Resources::Topic
raise Errors::TopicAlreadyExists.new(expected_topic: topic.name) if exists?(topic)
aws_client.create_topic(name: topic.name, attributes: topic.attributes, tags: topic.tags)
topic
end
|
#delete(topic) ⇒ Object
70
71
72
73
74
75
76
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 70
def delete(topic)
raise ArgumentError, 'Should be Resources::Topic' unless topic.is_a? Resources::Topic
raise Errors::TopicDoesNotExists.new(expected_topic: topic.name) unless exists?(topic)
aws_client.delete_topic topic_arn: topic.arn
topic
end
|
#exists?(topic) ⇒ Boolean
38
39
40
41
42
43
44
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 38
def exists?(topic)
raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic
aws_client.get_topic_attributes({ topic_arn: topic.arn }) && true
rescue Aws::SNS::Errors::NotFound
false
end
|
#find_subscription_arn(topic:, endpoint:) ⇒ Object
163
164
165
166
167
168
169
170
171
172
173
174
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 163
def find_subscription_arn(topic:, endpoint:)
raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic
unless [Resources::Topic, Resources::Queue].include? endpoint.class
raise ArgumentError, 'Endpoint should be Topic or Queue'
end
found_subscription = topic_subscriptions(topic).select do |subscription|
subscription.endpoint == endpoint.arn
end.first
found_subscription ? found_subscription.subscription_arn : nil
end
|
#list_all ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 103
def list_all
topics = []
resp = aws_client.list_topics
loop do
resp[:topics].map do |t|
topics << Resources::Topic.from_arn(t[:topic_arn])
end
break if resp[:next_token].nil?
resp = aws_client.list_topics(next_token: resp[:next_token])
end
topics
end
|
#list_subscriptions ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 119
def list_subscriptions
subscriptions = []
resp = aws_client.list_subscriptions
loop do
resp[:subscriptions].each do |s|
endpoint =
case s.protocol
when 'sqs'
Resources::Queue.from_arn(s.endpoint)
when 'sns'
Resources::Topic.from_arn(s.endpoint)
else
next
end
subscriptions << { topic: Resources::Topic.from_arn(s.topic_arn), endpoint: endpoint, arn: s.subscription_arn }
end
break if resp[:next_token].nil?
resp = aws_client.list_subscriptions(next_token: resp[:next_token])
end
subscriptions
end
|
#publish(msg, fifo:, topic: nil, skip_validation: false) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 17
def publish(msg, fifo:, topic: nil, skip_validation: false)
return Fake.sns_publish_response(msg) if config.fake_publish
topic = topic ? custom_topic(topic) : topic(msg.type, kind: msg.kind, fifo: fifo)
raise Errors::GroupIdUndefined.new(resource: topic) if fifo && msg.group_id.nil?
raise Errors::GroupDefined.new(resource: topic) if !fifo && msg.group_id
raise Errors::DeduplicationIdDefined.new(resource: topic) if !fifo && msg.deduplication_id
msg.validation.check! unless skip_validation
params = {
topic_arn: topic.arn,
message: msg.to_json,
message_group_id: msg.group_id,
message_deduplication_id: msg.deduplication_id
}.compact
aws_client.publish(**params)
end
|
#publish_command(type, fifo:, topic: nil, **options) ⇒ Object
54
55
56
57
58
59
60
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 54
def publish_command(type, fifo:, topic: nil, **options)
options[:version] ||= config.version
options[:data] ||= {}
options[:uuid] ||= SecureRandom.uuid
publish command(type, **options), fifo: fifo, topic: topic
end
|
#publish_event(type, fifo:, topic: nil, **options) ⇒ Object
46
47
48
49
50
51
52
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 46
def publish_event(type, fifo:, topic: nil, **options)
options[:version] ||= config.version
options[:data] ||= {}
options[:uuid] ||= SecureRandom.uuid
publish event(type, **options), fifo: fifo, topic: topic
end
|
#subscribe(topic:, endpoint:) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 83
def subscribe(topic:, endpoint:)
subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
raise Errors::SubscriptionAlreadyExists.new(topic: topic, endpoint: endpoint) if subscription_arn
aws_client.subscribe(
{
topic_arn: topic.arn,
protocol: endpoint.protocol,
endpoint: endpoint.arn
}
)
end
|
#subscribed?(topic:, endpoint:) ⇒ Boolean
78
79
80
81
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 78
def subscribed?(topic:, endpoint:)
subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
subscription_arn.nil? ? false : true
end
|
#topic_subscriptions(topic) ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 145
def topic_subscriptions(topic)
raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic
subscriptions = []
resp = aws_client.list_subscriptions_by_topic(topic_arn: topic.arn)
loop do
next_token = resp[:next_token]
subscriptions += resp[:subscriptions]
break if next_token.nil?
resp = aws_client.list_subscriptions_by_topic(topic_arn: topic.arn, next_token: next_token)
end
subscriptions
end
|
#unsubscribe(topic:, endpoint:) ⇒ Object
96
97
98
99
100
101
|
# File 'lib/cyclone_lariat/clients/sns.rb', line 96
def unsubscribe(topic:, endpoint:)
subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
raise Errors::SubscriptionDoesNotExists.new(topic: topic, endpoint: endpoint) unless subscription_arn
aws_client.unsubscribe(subscription_arn: subscription_arn)
end
|