Class: Legion::Transport::Message
- Inherits:
-
Object
- Object
- Legion::Transport::Message
show all
- Extended by:
- Logging::Helper
- Includes:
- Common
- Defined in:
- lib/legion/transport/message.rb
Constant Summary
collapse
- ENVELOPE_KEYS =
%i[
headers content_type content_encoding persistent expiration
priority app_id user_id reply_to correlation_id message_id
routing_key exchange type
].freeze
Constants included
from Common
Common::NAMESPACE_BOUNDARIES
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Common
#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(**options) ⇒ Message
Returns a new instance of Message.
14
15
16
17
|
# File 'lib/legion/transport/message.rb', line 14
def initialize(**options)
@options = options
validate
end
|
Class Method Details
.max_payload_bytes ⇒ Object
19
20
21
22
23
24
|
# File 'lib/legion/transport/message.rb', line 19
def self.max_payload_bytes
Legion::Settings[:transport][:max_payload_bytes]
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.message.max_payload_bytes')
1_048_576
end
|
Instance Method Details
#app_id ⇒ Object
167
168
169
|
# File 'lib/legion/transport/message.rb', line 167
def app_id
@options[:app_id] || 'legion'
end
|
#confirm_publish(exchange_dest, options) ⇒ Object
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/legion/transport/message.rb', line 102
def confirm_publish(exchange_dest, options)
return :accepted unless options[:publisher_confirm] == true
confirm_channel = publish_channel(exchange_dest)
return :accepted unless confirm_channel.respond_to?(:wait_for_confirms)
timeout = options[:publish_confirm_timeout_ms]
confirmed = if timeout
confirm_channel.wait_for_confirms(timeout.to_f / 1000.0)
else
confirm_channel.wait_for_confirms
end
confirmed == false ? :nacked : :accepted
rescue Timeout::Error => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.message.confirm_publish')
:confirm_timeout
end
|
#content_encoding ⇒ Object
291
292
293
|
# File 'lib/legion/transport/message.rb', line 291
def content_encoding
'identity'
end
|
#content_type ⇒ Object
287
288
289
|
# File 'lib/legion/transport/message.rb', line 287
def content_type
'application/json'
end
|
#correlation_id ⇒ Object
ID of the message that this message is a reply to. Links subtasks back to the parent task.
186
187
188
|
# File 'lib/legion/transport/message.rb', line 186
def correlation_id
@options[:correlation_id] || @options[:parent_id] || @options[:task_id]
end
|
#encode_message ⇒ Object
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
# File 'lib/legion/transport/message.rb', line 220
def encode_message
message_payload = message
message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String
if encrypt?
encrypted = Legion::Crypt.encrypt(message_payload)
[:iv] = encrypted[:iv]
@options[:content_encoding] = 'encrypted/cs'
log.debug "Message encrypted with content_encoding=encrypted/cs class=#{self.class.name}"
return encrypted[:enciphered_message]
else
@options[:content_encoding] = 'identity'
end
message_payload
end
|
#encrypt? ⇒ Boolean
241
242
243
244
245
246
247
248
|
# File 'lib/legion/transport/message.rb', line 241
def encrypt?
should_encrypt = if @options.key?(:encrypt)
@options[:encrypt]
else
Legion::Settings[:transport][:messages][:encrypt]
end
should_encrypt && Legion::Settings[:crypt][:cs_encrypt_ready]
end
|
#encrypt_message(message, _type = 'cs') ⇒ Object
237
238
239
|
# File 'lib/legion/transport/message.rb', line 237
def encrypt_message(message, _type = 'cs')
Legion::Crypt.encrypt(message)
end
|
#exchange ⇒ Object
255
256
257
|
# File 'lib/legion/transport/message.rb', line 255
def exchange
Kernel.const_get(exchange_name)
end
|
#exchange_name ⇒ Object
250
251
252
253
|
# File 'lib/legion/transport/message.rb', line 250
def exchange_name
parts = derive_extension_parts
"Legion::Extensions::#{parts.join('::')}::Transport::Exchanges::#{parts.first}"
end
|
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
# File 'lib/legion/transport/message.rb', line 259
def
@options[:headers] ||= Concurrent::Hash.new
@options[:headers]['legion_protocol_version'] ||= '2.0'
%i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function
chain_id debug].each do ||
next unless @options.key?
value = @options[]
@options[:headers][] = case value
when Integer, Float, TrueClass, FalseClass
value
else
value.to_s
end
end
@options[:headers]
rescue StandardError => e
handle_exception(e, level: :error, handled: true, operation: 'transport.message.headers')
{}
end
|
#install_return_listener(exchange_dest, options, return_state) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/legion/transport/message.rb', line 126
def install_return_listener(exchange_dest, options, return_state)
return unless options[:mandatory] == true
return_channel = publish_channel(exchange_dest)
return unless return_channel.respond_to?(:on_return)
expected_correlation_id = correlation_id
expected_message_id = message_id
return_channel.on_return do |return_info, properties, _content|
next if properties.respond_to?(:correlation_id) && properties.correlation_id &&
expected_correlation_id && properties.correlation_id != expected_correlation_id
next if properties.respond_to?(:message_id) && properties.message_id &&
expected_message_id && properties.message_id != expected_message_id
return_state[:returned] = true
return_state[:reply_code] = return_info.reply_code if return_info.respond_to?(:reply_code)
return_state[:reply_text] = return_info.reply_text if return_info.respond_to?(:reply_text)
end
end
|
#message ⇒ Object
212
213
214
|
# File 'lib/legion/transport/message.rb', line 212
def message
@options.except(*ENVELOPE_KEYS)
end
|
#message_id ⇒ Object
171
172
173
|
# File 'lib/legion/transport/message.rb', line 171
def message_id
@options[:message_id] || @options[:task_id]
end
|
#persistent ⇒ Object
190
191
192
|
# File 'lib/legion/transport/message.rb', line 190
def persistent
@options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end
|
#prepare_publisher_confirms(exchange_dest, options) ⇒ Object
93
94
95
96
97
98
99
100
|
# File 'lib/legion/transport/message.rb', line 93
def prepare_publisher_confirms(exchange_dest, options)
return unless options[:publisher_confirm] == true
confirm_channel = publish_channel(exchange_dest)
return unless confirm_channel.respond_to?(:confirm_select)
confirm_channel.confirm_select
end
|
#priority ⇒ Object
283
284
285
|
# File 'lib/legion/transport/message.rb', line 283
def priority
@options[:priority] || Legion::Transport.settings[:messages][:priority] || 0
end
|
#publish(options = nil) ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/legion/transport/message.rb', line 26
def publish(options = nil)
raise unless @valid
publish_options = options ? @options.merge(options) : @options
validate_payload_size
ex_class = exchange
exchange_dest = if ex_class.respond_to?(:cached_instance)
ex_class.cached_instance || ex_class.new
elsif ex_class.respond_to?(:new)
ex_class.new
else
ex_class
end
return_state = {}
install_return_listener(exchange_dest, publish_options, return_state)
prepare_publisher_confirms(exchange_dest, publish_options)
exchange_dest.publish(encode_message,
**publish_envelope_options(publish_options))
result = publish_result(exchange_dest, publish_options, return_state)
return result if return_publish_result?(publish_options)
nil
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
Bunny::NetworkErrorWrapper, IOError, Timeout::Error => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.message.publish',
spooled: spool_enabled?(publish_options))
spool_message(e, publish_options) if spool_enabled?(publish_options)
publish_failure_result(spool_enabled?(publish_options) ? :spooled : :failed, e, publish_options)
end
|
#publish_channel(exchange_dest) ⇒ Object
120
121
122
123
124
|
# File 'lib/legion/transport/message.rb', line 120
def publish_channel(exchange_dest)
return exchange_dest.channel if exchange_dest.respond_to?(:channel)
channel
end
|
#publish_envelope_options(options) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/legion/transport/message.rb', line 56
def publish_envelope_options(options)
{
routing_key: options[:routing_key] || routing_key || '',
content_type: options[:content_type] || content_type,
content_encoding: options[:content_encoding] || content_encoding,
type: options[:type] || type,
priority: options[:priority] || priority,
expiration: options[:expiration] || expiration,
headers: ,
persistent: options.key?(:persistent) ? options[:persistent] : persistent,
message_id: message_id,
correlation_id: correlation_id,
reply_to: reply_to,
app_id: app_id,
timestamp: timestamp
}.tap do |envelope|
envelope[:mandatory] = true if options[:mandatory] == true
end
end
|
#publish_failure_result(status, error, options = @options) ⇒ Object
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/legion/transport/message.rb', line 155
def publish_failure_result(status, error, options = @options)
{
status: status,
accepted: false,
error_class: error.class.name,
error: error.message,
routing_key: options[:routing_key] || routing_key || '',
message_id: message_id,
correlation_id: correlation_id
}
end
|
#publish_result(exchange_dest, options, return_state) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/legion/transport/message.rb', line 76
def publish_result(exchange_dest, options, return_state)
confirmed_status = confirm_publish(exchange_dest, options)
status = return_state[:returned] ? :unroutable : confirmed_status
ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
log.debug "Published to exchange=#{ex_name} routing_key=#{options[:routing_key] || routing_key || ''} class=#{self.class.name}"
{
status: status,
accepted: status == :accepted,
exchange: ex_name,
routing_key: options[:routing_key] || routing_key || '',
message_id: message_id,
return_reply_code: return_state[:reply_code],
return_reply_text: return_state[:reply_text],
correlation_id: correlation_id
}.compact
end
|
#reply_to ⇒ Object
180
181
182
|
# File 'lib/legion/transport/message.rb', line 180
def reply_to
@options[:reply_to]
end
|
#return_publish_result?(options) ⇒ Boolean
150
151
152
153
|
# File 'lib/legion/transport/message.rb', line 150
def return_publish_result?(options)
options[:return_result] == true || options[:mandatory] == true || options[:publisher_confirm] == true ||
options[:spool] == false
end
|
#routing_key ⇒ Object
216
217
218
|
# File 'lib/legion/transport/message.rb', line 216
def routing_key
@options[:routing_key] if @options.key? :routing_key
end
|
#spool_enabled?(options) ⇒ Boolean
146
147
148
|
# File 'lib/legion/transport/message.rb', line 146
def spool_enabled?(options)
options.fetch(:spool, true) != false
end
|
#timestamp ⇒ Object
299
300
301
|
# File 'lib/legion/transport/message.rb', line 299
def timestamp
Time.now.to_i
end
|
#type ⇒ Object
295
296
297
|
# File 'lib/legion/transport/message.rb', line 295
def type
'task'
end
|
#user_id ⇒ Object
176
177
178
|
# File 'lib/legion/transport/message.rb', line 176
def user_id
@options[:user_id] || Legion::Transport.settings[:connection][:user]
end
|
#validate ⇒ Object
303
304
305
|
# File 'lib/legion/transport/message.rb', line 303
def validate
@valid = true
end
|