Class: Legion::Transport::Message

Inherits:
Object
  • Object
show all
Extended by:
Logging::Helper
Includes:
Common
Defined in:
lib/legion/transport/message.rb

Constant Summary collapse

ENVELOPE_KEYS =
%i[
  headers content_type content_encoding persistent expiration
  priority app_id user_id reply_to correlation_id message_id
  routing_key exchange type
].freeze

Constants included from Common

Common::NAMESPACE_BOUNDARIES

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Common

#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(**options) ⇒ Message

Returns a new instance of Message.



14
15
16
17
# File 'lib/legion/transport/message.rb', line 14

def initialize(**options)
  @options = options
  validate
end

Class Method Details

.max_payload_bytesObject



19
20
21
22
23
24
# File 'lib/legion/transport/message.rb', line 19

def self.max_payload_bytes
  Legion::Settings[:transport][:max_payload_bytes]
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.message.max_payload_bytes')
  1_048_576
end

Instance Method Details

#app_idObject



59
60
61
# File 'lib/legion/transport/message.rb', line 59

def app_id
  @options[:app_id] || 'legion'
end

#channelObject



197
198
199
# File 'lib/legion/transport/message.rb', line 197

def channel
  Legion::Transport::Connection.channel
end

#content_encodingObject



181
182
183
# File 'lib/legion/transport/message.rb', line 181

def content_encoding
  'identity'
end

#content_typeObject



177
178
179
# File 'lib/legion/transport/message.rb', line 177

def content_type
  'application/json'
end

#correlation_idObject

ID of the message that this message is a reply to. Links subtasks back to the parent task.



78
79
80
# File 'lib/legion/transport/message.rb', line 78

def correlation_id
  @options[:correlation_id] || @options[:parent_id] || @options[:task_id]
end

#encode_messageObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/legion/transport/message.rb', line 110

def encode_message
  message_payload = message
  message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String

  if encrypt?
    encrypted = Legion::Crypt.encrypt(message_payload)
    headers[:iv] = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    log.debug "Message encrypted with content_encoding=encrypted/cs class=#{self.class.name}"
    return encrypted[:enciphered_message]
  else
    @options[:content_encoding] = 'identity'
  end

  message_payload
end

#encrypt?Boolean

Returns:

  • (Boolean)


131
132
133
134
135
136
137
138
# File 'lib/legion/transport/message.rb', line 131

def encrypt?
  should_encrypt = if @options.key?(:encrypt)
                     @options[:encrypt]
                   else
                     Legion::Settings[:transport][:messages][:encrypt]
                   end
  should_encrypt && Legion::Settings[:crypt][:cs_encrypt_ready]
end

#encrypt_message(message, _type = 'cs') ⇒ Object



127
128
129
# File 'lib/legion/transport/message.rb', line 127

def encrypt_message(message, _type = 'cs')
  Legion::Crypt.encrypt(message)
end

#exchangeObject



145
146
147
# File 'lib/legion/transport/message.rb', line 145

def exchange
  Kernel.const_get(exchange_name)
end

#exchange_nameObject



140
141
142
143
# File 'lib/legion/transport/message.rb', line 140

def exchange_name
  parts = derive_extension_parts
  "Legion::Extensions::#{parts.join('::')}::Transport::Exchanges::#{parts.first}"
end

#expirationObject



86
87
88
89
90
91
92
93
94
# File 'lib/legion/transport/message.rb', line 86

def expiration
  if @options.key? :expiration
    @options[:expiration]
  elsif @options.key? :ttl
    @options[:ttl]
  elsif Legion::Transport.settings[:messages].key? :expiration
    Legion::Transport.settings[:messages][:expiration]
  end
end

#headersObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/legion/transport/message.rb', line 149

def headers
  @options[:headers] ||= Concurrent::Hash.new
  @options[:headers]['legion_protocol_version'] ||= '2.0'
  inject_region_header
  inject_legion_region_header
  %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function
     chain_id debug].each do |header|
    next unless @options.key? header

    value = @options[header]
    @options[:headers][header] = case value
                                 when Integer, Float, TrueClass, FalseClass
                                   value
                                 else
                                   value.to_s
                                 end
  end
  inject_identity_headers
  @options[:headers]
rescue StandardError => e
  handle_exception(e, level: :error, handled: true, operation: 'transport.message.headers')
  {}
end

#messageObject



102
103
104
# File 'lib/legion/transport/message.rb', line 102

def message
  @options.except(*ENVELOPE_KEYS)
end

#message_idObject



63
64
65
# File 'lib/legion/transport/message.rb', line 63

def message_id
  @options[:message_id] || @options[:task_id]
end

#persistentObject



82
83
84
# File 'lib/legion/transport/message.rb', line 82

def persistent
  @options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end

#priorityObject



173
174
175
# File 'lib/legion/transport/message.rb', line 173

def priority
  @options[:priority] || Legion::Transport.settings[:messages][:priority] || 0
end

#publish(options = @options) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/legion/transport/message.rb', line 26

def publish(options = @options)
  raise unless @valid

  validate_payload_size
  ex_class = exchange
  exchange_dest = if ex_class.respond_to?(:cached_instance)
                    ex_class.cached_instance || ex_class.new
                  elsif ex_class.respond_to?(:new)
                    ex_class.new
                  else
                    ex_class
                  end
  exchange_dest.publish(encode_message,
                        routing_key:      routing_key || '',
                        content_type:     options[:content_type] || content_type,
                        content_encoding: options[:content_encoding] || content_encoding,
                        type:             options[:type] || type,
                        priority:         options[:priority] || priority,
                        expiration:       options[:expiration] || expiration,
                        headers:          headers,
                        persistent:       persistent,
                        message_id:       message_id,
                        correlation_id:   correlation_id,
                        app_id:           app_id,
                        timestamp:        timestamp)
  ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
  log.debug "Published to exchange=#{ex_name} routing_key=#{routing_key || ''} class=#{self.class.name}"
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
       Bunny::NetworkErrorWrapper, IOError, Timeout::Error => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.message.publish', spooled: true)
  spool_message(e)
end

#reply_toObject



72
73
74
# File 'lib/legion/transport/message.rb', line 72

def reply_to
  @options[:reply_to]
end

#routing_keyObject



106
107
108
# File 'lib/legion/transport/message.rb', line 106

def routing_key
  @options[:routing_key] if @options.key? :routing_key
end

#timestampObject



189
190
191
# File 'lib/legion/transport/message.rb', line 189

def timestamp
  Time.now.to_i
end

#typeObject



185
186
187
# File 'lib/legion/transport/message.rb', line 185

def type
  'task'
end

#user_idObject

user_id Sender’s identifier. www.rabbitmq.com/extensions.html#validated-user-id



68
69
70
# File 'lib/legion/transport/message.rb', line 68

def user_id
  @options[:user_id] || Legion::Transport.settings[:connection][:user]
end

#validateObject



193
194
195
# File 'lib/legion/transport/message.rb', line 193

def validate
  @valid = true
end