Class: Legion::Transport::Message

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/message.rb

Constant Summary collapse

ENVELOPE_KEYS =
%i[
  headers content_type content_encoding persistent expiration
  priority app_id user_id reply_to correlation_id message_id
  routing_key exchange type
].freeze

Instance Method Summary collapse

Methods included from Common

#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(**options) ⇒ Message

Returns a new instance of Message.



8
9
10
11
# File 'lib/legion/transport/message.rb', line 8

def initialize(**options)
  @options = options
  validate
end

Instance Method Details

#app_idObject



35
36
37
# File 'lib/legion/transport/message.rb', line 35

def app_id
  @options[:app_id] || 'legion'
end

#channelObject



168
169
170
# File 'lib/legion/transport/message.rb', line 168

def channel
  Legion::Transport::Connection.channel
end

#content_encodingObject



152
153
154
# File 'lib/legion/transport/message.rb', line 152

def content_encoding
  'identity'
end

#content_typeObject



148
149
150
# File 'lib/legion/transport/message.rb', line 148

def content_type
  'application/json'
end

#correlation_idObject

ID of the message that this message is a reply to. Links subtasks back to the parent task.



54
55
56
# File 'lib/legion/transport/message.rb', line 54

def correlation_id
  @options[:correlation_id] || @options[:parent_id] || @options[:task_id]
end

#encode_messageObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/legion/transport/message.rb', line 86

def encode_message
  message_payload = message
  message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String

  if encrypt?
    encrypted = Legion::Crypt.encrypt(message_payload)
    headers[:iv] = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    return encrypted[:enciphered_message]
  else
    @options[:content_encoding] = 'identity'
  end

  message_payload
end

#encrypt?Boolean

Returns:

  • (Boolean)


106
107
108
109
110
111
112
113
# File 'lib/legion/transport/message.rb', line 106

def encrypt?
  should_encrypt = if @options.key?(:encrypt)
                     @options[:encrypt]
                   else
                     Legion::Settings[:transport][:messages][:encrypt]
                   end
  should_encrypt && Legion::Settings[:crypt][:cs_encrypt_ready]
end

#encrypt_message(message, _type = 'cs') ⇒ Object



102
103
104
# File 'lib/legion/transport/message.rb', line 102

def encrypt_message(message, _type = 'cs')
  Legion::Crypt.encrypt(message)
end

#exchangeObject



120
121
122
# File 'lib/legion/transport/message.rb', line 120

def exchange
  Kernel.const_get(exchange_name)
end

#exchange_nameObject



115
116
117
118
# File 'lib/legion/transport/message.rb', line 115

def exchange_name
  lex = self.class.ancestors.first.to_s.split('::')[2].downcase
  "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end

#expirationObject



62
63
64
65
66
67
68
69
70
# File 'lib/legion/transport/message.rb', line 62

def expiration
  if @options.key? :expiration
    @options[:expiration]
  elsif @options.key? :ttl
    @options[:ttl]
  elsif Legion::Transport.settings[:messages].key? :expiration
    Legion::Transport.settings[:messages][:expiration]
  end
end

#headersObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/legion/transport/message.rb', line 124

def headers
  @options[:headers] ||= Concurrent::Hash.new
  %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function
     chain_id debug].each do |header|
    next unless @options.key? header

    value = @options[header]
    @options[:headers][header] = case value
                                 when Integer, Float, TrueClass, FalseClass
                                   value
                                 else
                                   value.to_s
                                 end
  end
  @options[:headers]
rescue StandardError => e
  Legion::Transport.logger.error e.message
  Legion::Transport.logger.error e.backtrace
end

#messageObject



78
79
80
# File 'lib/legion/transport/message.rb', line 78

def message
  @options.except(*ENVELOPE_KEYS)
end

#message_idObject



39
40
41
# File 'lib/legion/transport/message.rb', line 39

def message_id
  @options[:message_id] || @options[:task_id]
end

#persistentObject



58
59
60
# File 'lib/legion/transport/message.rb', line 58

def persistent
  @options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end

#priorityObject



144
145
146
# File 'lib/legion/transport/message.rb', line 144

def priority
  @options[:priority] || Legion::Transport.settings[:messages][:priority] || 0
end

#publish(options = @options) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/legion/transport/message.rb', line 13

def publish(options = @options)
  raise unless @valid

  exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
  exchange_dest.publish(encode_message,
                        routing_key:      routing_key || '',
                        content_type:     options[:content_type] || content_type,
                        content_encoding: options[:content_encoding] || content_encoding,
                        type:             options[:type] || type,
                        priority:         options[:priority] || priority,
                        expiration:       options[:expiration] || expiration,
                        headers:          headers,
                        persistent:       persistent,
                        message_id:       message_id,
                        correlation_id:   correlation_id,
                        app_id:           app_id,
                        timestamp:        timestamp)
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
       Bunny::NetworkErrorWrapper, IOError => e
  spool_message(e)
end

#reply_toObject



48
49
50
# File 'lib/legion/transport/message.rb', line 48

def reply_to
  @options[:reply_to]
end

#routing_keyObject



82
83
84
# File 'lib/legion/transport/message.rb', line 82

def routing_key
  @options[:routing_key] if @options.key? :routing_key
end

#timestampObject



160
161
162
# File 'lib/legion/transport/message.rb', line 160

def timestamp
  Time.now.to_i
end

#typeObject



156
157
158
# File 'lib/legion/transport/message.rb', line 156

def type
  'task'
end

#user_idObject

user_id Sender’s identifier. www.rabbitmq.com/extensions.html#validated-user-id



44
45
46
# File 'lib/legion/transport/message.rb', line 44

def user_id
  @options[:user_id] || Legion::Transport.settings[:connection][:user]
end

#validateObject



164
165
166
# File 'lib/legion/transport/message.rb', line 164

def validate
  @valid = true
end