Class: CycloneLariat::Clients::Sqs

Inherits:
Abstract
  • Object
show all
Includes:
Generators::Queue, LunaPark::Extensions::Injector
Defined in:
lib/cyclone_lariat/clients/sqs.rb

Instance Attribute Summary

Attributes inherited from Abstract

#config

Instance Method Summary collapse

Methods included from Generators::Queue

#custom_queue, #queue

Methods inherited from Abstract

#initialize

Methods included from Generators::Command

#command, #command_v1, #command_v2

Methods included from Generators::Event

#event, #event_v1, #event_v2

Constructor Details

This class inherits a constructor from CycloneLariat::Clients::Abstract

Instance Method Details

#add_policy(queue:, policy:) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/cyclone_lariat/clients/sqs.rb', line 25

def add_policy(queue:, policy:)
  current_policy_json = aws_client.get_queue_attributes({
    queue_url: queue.url,
    attribute_names: ['Policy']
  }).attributes['Policy']

  current_policy = JSON.parse(current_policy_json) if current_policy_json

  return if current_policy && current_policy['Statement'].find { |s| s['Sid'] == policy['Sid'] }

  new_policy = current_policy || { 'Statement' => [] }
  new_policy['Statement'] << policy

  aws_client.set_queue_attributes({ queue_url: queue.url, attributes: { 'Policy' => new_policy.to_json } })
end

#create(queue) ⇒ Object

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
# File 'lib/cyclone_lariat/clients/sqs.rb', line 78

def create(queue)
  raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue
  raise Errors::QueueAlreadyExists.new(expected_queue: queue.name) if exists?(queue)

  aws_client.create_queue(queue_name: queue.name, attributes: queue.attributes, tags: queue.tags)
  queue
end

#delete(queue) ⇒ Object

Raises:

  • (ArgumentError)


86
87
88
89
90
91
92
# File 'lib/cyclone_lariat/clients/sqs.rb', line 86

def delete(queue)
  raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue
  raise Errors::QueueDoesNotExists.new(expected_queue: queue.name) unless exists?(queue)

  aws_client.delete_queue queue_url: queue.url
  queue
end

#exists?(queue) ⇒ Boolean

Returns:

  • (Boolean)


17
18
19
20
21
22
23
# File 'lib/cyclone_lariat/clients/sqs.rb', line 17

def exists?(queue)
  raise ArgumentError, 'Should be queue' unless queue.is_a? Resources::Queue

  aws_client.get_queue_url(queue_name: queue.to_s) && true
rescue Aws::SQS::Errors::NonExistentQueue
  false
end

#list_allObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/cyclone_lariat/clients/sqs.rb', line 94

def list_all
  queues = []
  resp = aws_client.list_queues

  loop do
    next_token = resp[:next_token]

    resp[:queue_urls].map do |url|
      queues << Resources::Queue.from_url(url)
    end

    break if next_token.nil?

    resp = aws_client.list_queues(next_token: next_token)
  end

  queues
end

#publish(msg, fifo:, dest: nil, queue: nil, skip_validation: false) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/cyclone_lariat/clients/sqs.rb', line 41

def publish(msg, fifo:, dest: nil, queue: nil, skip_validation: false)
  return Fake.sqs_send_message_result(msg) if config.fake_publish

  queue = queue ? custom_queue(queue) : queue(msg.type, kind: msg.kind, fifo: fifo, dest: dest)

  raise Errors::GroupIdUndefined.new(resource: queue)       if fifo && msg.group_id.nil?
  raise Errors::GroupDefined.new(resource: queue)           if !fifo && msg.group_id
  raise Errors::DeduplicationIdDefined.new(resource: queue) if !fifo && msg.deduplication_id

  msg.validation.check! unless skip_validation

  params = {
    queue_url: queue.url,
    message_body: msg.to_json,
    message_group_id: msg.group_id,
    message_deduplication_id: msg.deduplication_id
  }.compact

  aws_client.send_message(**params)
end

#publish_command(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object



70
71
72
73
74
75
76
# File 'lib/cyclone_lariat/clients/sqs.rb', line 70

def publish_command(type, fifo:, dest: nil, queue: nil, **options)
  options[:version] ||= self.config.version
  options[:data]    ||= {}
  options[:uuid]    ||= SecureRandom.uuid

  publish event(type, data: data, **options), fifo: fifo, dest: dest, queue: queue
end

#publish_event(type, fifo:, dest: nil, queue: nil, **options) ⇒ Object



62
63
64
65
66
67
68
# File 'lib/cyclone_lariat/clients/sqs.rb', line 62

def publish_event(type, fifo:, dest: nil, queue: nil, **options)
  options[:version] ||= self.config.version
  options[:data]    ||= {}
  options[:uuid]    ||= SecureRandom.uuid

  publish event(type, data: data, **options), fifo: fifo, dest: dest, queue: queue
end