Class: EventQ::Amazon::EventQClient

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq/eventq_aws/aws_eventq_client.rb

Overview

Implements a general interface to raise an event EventQ::RabbitMq::EventQClient is the sister-class which does the same for RabbitMq

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ EventQClient

Returns a new instance of EventQClient.



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 6

def initialize(options)
  raise ':client (QueueClient) must be specified.'.freeze if options[:client].nil?

  @client = options[:client]

  @serialization_manager = EventQ::SerializationProviders::Manager.new
  @signature_manager = EventQ::SignatureProviders::Manager.new

  # this hash is used to record known event types:
  # key = event_type / name
  # value = topic arn
  @known_event_types = {}
end

Instance Method Details

#new_messageObject



115
116
117
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 115

def new_message
  EventQ::QueueMessage.new
end

#publish(topic:, event:, context: {}, region: nil) ⇒ Object



47
48
49
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 47

def publish(topic:, event:, context: {}, region: nil)
  raise_event(topic, event, context, region)
end

#publish_batch(topic:, events:, context: {}, region: nil) ⇒ Object



51
52
53
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 51

def publish_batch(topic:, events:, context: {}, region: nil)
  raise_events_batch(topic, events, context, region)
end

#raise_event(event_type, event, context = {}, region = nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 55

def raise_event(event_type, event, context = {}, region = nil)
  topic_arn = register_event(event_type, region)

  with_prepared_message(event_type, event, context) do |message|
    response = @client.sns(region).publish(
      topic_arn: topic_arn,
      message: message,
      subject: event_type
    )

    EventQ.logger.debug do
      "[#{self.class} #raise_event] - Published to SNS with topic_arn: #{topic_arn}" \
        " | event_type: #{event_type} | Message: #{message}"
    end

    response
  end
end

#raise_event_in_queue(event_type, event, queue, delay, context = {}) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 97

def raise_event_in_queue(event_type, event, queue, delay, context = {})
  queue_url = @client.sqs_helper.get_queue_url(queue)
  with_prepared_message(event_type, event, context) do |message|
    response = @client.sqs.send_message(
      queue_url: queue_url,
      message_body: sqs_message_body_for(message),
      delay_seconds: delay
    )

    EventQ.logger.debug do
      "[#{self.class} #raise_event_in_queue] - Raised event to SQS queue: #{queue_url}" \
        " | event_type: #{event_type} | Message: #{message}"
    end

    response
  end
end

#raise_events_batch(event_type, events, context = {}, region = nil) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 74

def raise_events_batch(event_type, events, context = {}, region = nil)
  topic_arn = register_event(event_type, region)
  publish_entries = prepare_batch_entries(event_type, events, context)

  message_ids = []
  # AWS SNS PublishBatch API allows a maximum of 10 messages per batch
  publish_entries.each_slice(10) do |batch_entries|
    response = @client.sns(region).publish_batch(
      topic_arn: topic_arn,
      publish_batch_request_entries: batch_entries
    )

    EventQ.logger.debug do
      "[#{self.class} #raise_events_batch] - Published batch to SNS with topic_arn: #{topic_arn}" \
        " | event_type: #{event_type} | batch_size: #{batch_entries.length}"
    end

    message_ids.concat(response.successful.map(&:message_id))
  end

  message_ids
end

#register_event(event_type, region = nil) ⇒ String

Registers the event event_type and returns its topic arn.

Parameters:

  • event_type (String)
  • region (String) (defaults to: nil)

Returns:

  • (String)


38
39
40
41
42
43
44
45
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 38

def register_event(event_type, region = nil)
  topic_key = "#{region}:#{event_type}"
  return @known_event_types[topic_key] if registered?(event_type, region)

  topic_arn = @client.sns_helper(region).create_topic_arn(event_type, region)
  @known_event_types[topic_key] = topic_arn
  topic_arn
end

#registered?(event_type, region = nil) ⇒ Boolean

Returns true if the event has already been registerd, or false otherwise.

Parameters:

  • event_type (String)
  • region (String) (defaults to: nil)

Returns:

  • (Boolean)


27
28
29
30
# File 'lib/eventq/eventq_aws/aws_eventq_client.rb', line 27

def registered?(event_type, region = nil)
  topic_key = "#{region}:#{event_type}"
  @known_event_types.key?(topic_key)
end