Class: CycloneLariat::Clients::Sns

Inherits:
Abstract
  • Object
show all
Includes:
Generators::Topic, LunaPark::Extensions::Injector
Defined in:
lib/cyclone_lariat/clients/sns.rb

Instance Attribute Summary

Attributes inherited from Abstract

#config

Instance Method Summary collapse

Methods included from Generators::Topic

#custom_topic, #topic

Methods inherited from Abstract

#initialize

Methods included from Generators::Command

#command, #command_v1, #command_v2

Methods included from Generators::Event

#event, #event_v1, #event_v2

Constructor Details

This class inherits a constructor from CycloneLariat::Clients::Abstract

Instance Method Details

#create(topic) ⇒ Object

Raises:

  • (ArgumentError)


62
63
64
65
66
67
68
# File 'lib/cyclone_lariat/clients/sns.rb', line 62

def create(topic)
  raise ArgumentError, 'Should be Resources::Topic' unless topic.is_a? Resources::Topic
  raise Errors::TopicAlreadyExists.new(expected_topic: topic.name) if exists?(topic)

  aws_client.create_topic(name: topic.name, attributes: topic.attributes, tags: topic.tags)
  topic
end

#delete(topic) ⇒ Object

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
# File 'lib/cyclone_lariat/clients/sns.rb', line 70

def delete(topic)
  raise ArgumentError, 'Should be Resources::Topic' unless topic.is_a? Resources::Topic
  raise Errors::TopicDoesNotExists.new(expected_topic: topic.name) unless exists?(topic)

  aws_client.delete_topic topic_arn: topic.arn
  topic
end

#exists?(topic) ⇒ Boolean

Returns:

  • (Boolean)


38
39
40
41
42
43
44
# File 'lib/cyclone_lariat/clients/sns.rb', line 38

def exists?(topic)
  raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic

  aws_client.get_topic_attributes({ topic_arn: topic.arn }) && true
rescue Aws::SNS::Errors::NotFound
  false
end

#find_subscription_arn(topic:, endpoint:) ⇒ Object

Raises:

  • (ArgumentError)


163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/cyclone_lariat/clients/sns.rb', line 163

def find_subscription_arn(topic:, endpoint:)
  raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic
  unless [Resources::Topic, Resources::Queue].include? endpoint.class
    raise ArgumentError, 'Endpoint should be Topic or Queue'
  end

  found_subscription = topic_subscriptions(topic).select do |subscription|
    subscription.endpoint == endpoint.arn
  end.first

  found_subscription ? found_subscription.subscription_arn : nil
end

#list_allObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/cyclone_lariat/clients/sns.rb', line 103

def list_all
  topics = []
  resp = aws_client.list_topics

  loop do
    resp[:topics].map do |t|
      topics << Resources::Topic.from_arn(t[:topic_arn])
    end

    break if resp[:next_token].nil?

    resp = aws_client.list_topics(next_token: resp[:next_token])
  end
  topics
end

#list_subscriptionsObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/cyclone_lariat/clients/sns.rb', line 119

def list_subscriptions
  subscriptions = []
  resp = aws_client.list_subscriptions

  loop do
    resp[:subscriptions].each do |s|
      endpoint =
        case s.protocol
        when 'sqs'
          Resources::Queue.from_arn(s.endpoint)
        when 'sns'
          Resources::Topic.from_arn(s.endpoint)
        else
          next
        end

      subscriptions << { topic: Resources::Topic.from_arn(s.topic_arn), endpoint: endpoint, arn: s.subscription_arn }
    end

    break if resp[:next_token].nil?

    resp = aws_client.list_subscriptions(next_token: resp[:next_token])
  end
  subscriptions
end

#publish(msg, fifo:, topic: nil, skip_validation: false) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/cyclone_lariat/clients/sns.rb', line 17

def publish(msg, fifo:, topic: nil, skip_validation: false)
  return Fake.sns_publish_response(msg) if config.fake_publish

  topic = topic ? custom_topic(topic) : topic(msg.type, kind: msg.kind, fifo: fifo)

  raise Errors::GroupIdUndefined.new(resource: topic)       if fifo && msg.group_id.nil?
  raise Errors::GroupDefined.new(resource: topic)           if !fifo && msg.group_id
  raise Errors::DeduplicationIdDefined.new(resource: topic) if !fifo && msg.deduplication_id

  msg.validation.check! unless skip_validation

  params = {
    topic_arn: topic.arn,
    message: msg.to_json,
    message_group_id: msg.group_id,
    message_deduplication_id: msg.deduplication_id
  }.compact

  aws_client.publish(**params)
end

#publish_command(type, fifo:, topic: nil, **options) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/cyclone_lariat/clients/sns.rb', line 54

def publish_command(type, fifo:, topic: nil, **options)
  options[:version] ||= config.version
  options[:data]    ||= {}
  options[:uuid]    ||= SecureRandom.uuid

  publish command(type, **options), fifo: fifo, topic: topic
end

#publish_event(type, fifo:, topic: nil, **options) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/cyclone_lariat/clients/sns.rb', line 46

def publish_event(type, fifo:, topic: nil, **options)
  options[:version] ||= config.version
  options[:data]    ||= {}
  options[:uuid]    ||= SecureRandom.uuid

  publish event(type, **options), fifo: fifo, topic: topic
end

#subscribe(topic:, endpoint:) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cyclone_lariat/clients/sns.rb', line 83

def subscribe(topic:, endpoint:)
  subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
  raise Errors::SubscriptionAlreadyExists.new(topic: topic, endpoint: endpoint) if subscription_arn

  aws_client.subscribe(
    {
      topic_arn: topic.arn,
      protocol: endpoint.protocol,
      endpoint: endpoint.arn
    }
  )
end

#subscribed?(topic:, endpoint:) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
81
# File 'lib/cyclone_lariat/clients/sns.rb', line 78

def subscribed?(topic:, endpoint:)
  subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
  subscription_arn.nil? ? false : true
end

#topic_subscriptions(topic) ⇒ Object

Raises:

  • (ArgumentError)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/cyclone_lariat/clients/sns.rb', line 145

def topic_subscriptions(topic)
  raise ArgumentError, 'Should be Topic' unless topic.is_a? Resources::Topic

  subscriptions = []

  resp = aws_client.list_subscriptions_by_topic(topic_arn: topic.arn)

  loop do
    next_token = resp[:next_token]
    subscriptions += resp[:subscriptions]

    break if next_token.nil?

    resp = aws_client.list_subscriptions_by_topic(topic_arn: topic.arn, next_token: next_token)
  end
  subscriptions
end

#unsubscribe(topic:, endpoint:) ⇒ Object



96
97
98
99
100
101
# File 'lib/cyclone_lariat/clients/sns.rb', line 96

def unsubscribe(topic:, endpoint:)
  subscription_arn = find_subscription_arn(topic: topic, endpoint: endpoint)
  raise Errors::SubscriptionDoesNotExists.new(topic: topic, endpoint: endpoint) unless subscription_arn

  aws_client.unsubscribe(subscription_arn: subscription_arn)
end