Class: Legion::Transport::Message
- Inherits:
-
Object
- Object
- Legion::Transport::Message
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/message.rb
Constant Summary
collapse
- ENVELOPE_KEYS =
%i[
headers content_type content_encoding persistent expiration
priority app_id user_id reply_to correlation_id message_id
routing_key exchange type
].freeze
Instance Method Summary
collapse
Methods included from Common
#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(**options) ⇒ Message
Returns a new instance of Message.
8
9
10
11
|
# File 'lib/legion/transport/message.rb', line 8
def initialize(**options)
@options = options
validate
end
|
Instance Method Details
#app_id ⇒ Object
37
38
39
|
# File 'lib/legion/transport/message.rb', line 37
def app_id
@options[:app_id] || 'legion'
end
|
#content_encoding ⇒ Object
158
159
160
|
# File 'lib/legion/transport/message.rb', line 158
def content_encoding
'identity'
end
|
#content_type ⇒ Object
154
155
156
|
# File 'lib/legion/transport/message.rb', line 154
def content_type
'application/json'
end
|
#correlation_id ⇒ Object
ID of the message that this message is a reply to. Links subtasks back to the parent task.
56
57
58
|
# File 'lib/legion/transport/message.rb', line 56
def correlation_id
@options[:correlation_id] || @options[:parent_id] || @options[:task_id]
end
|
#encode_message ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/legion/transport/message.rb', line 88
def encode_message
message_payload = message
message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String
if encrypt?
encrypted = Legion::Crypt.encrypt(message_payload)
[:iv] = encrypted[:iv]
@options[:content_encoding] = 'encrypted/cs'
Legion::Logging.debug "Message encrypted with content_encoding=encrypted/cs class=#{self.class.name}" if defined?(Legion::Logging)
return encrypted[:enciphered_message]
else
@options[:content_encoding] = 'identity'
end
message_payload
end
|
#encrypt? ⇒ Boolean
109
110
111
112
113
114
115
116
|
# File 'lib/legion/transport/message.rb', line 109
def encrypt?
should_encrypt = if @options.key?(:encrypt)
@options[:encrypt]
else
Legion::Settings[:transport][:messages][:encrypt]
end
should_encrypt && Legion::Settings[:crypt][:cs_encrypt_ready]
end
|
#encrypt_message(message, _type = 'cs') ⇒ Object
105
106
107
|
# File 'lib/legion/transport/message.rb', line 105
def encrypt_message(message, _type = 'cs')
Legion::Crypt.encrypt(message)
end
|
#exchange ⇒ Object
123
124
125
|
# File 'lib/legion/transport/message.rb', line 123
def exchange
Kernel.const_get(exchange_name)
end
|
#exchange_name ⇒ Object
118
119
120
121
|
# File 'lib/legion/transport/message.rb', line 118
def exchange_name
lex = self.class.ancestors.first.to_s.split('::')[2].downcase
"Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end
|
#expiration ⇒ Object
64
65
66
67
68
69
70
71
72
|
# File 'lib/legion/transport/message.rb', line 64
def expiration
if @options.key? :expiration
@options[:expiration]
elsif @options.key? :ttl
@options[:ttl]
elsif Legion::Transport.settings[:messages].key? :expiration
Legion::Transport.settings[:messages][:expiration]
end
end
|
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/legion/transport/message.rb', line 127
def
@options[:headers] ||= Concurrent::Hash.new
@options[:headers]['legion_protocol_version'] ||= '2.0'
%i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function
chain_id debug].each do ||
next unless @options.key?
value = @options[]
@options[:headers][] = case value
when Integer, Float, TrueClass, FalseClass
value
else
value.to_s
end
end
@options[:headers]
rescue StandardError => e
Legion::Transport.logger.error e.message
Legion::Transport.logger.error e.backtrace
end
|
#message ⇒ Object
80
81
82
|
# File 'lib/legion/transport/message.rb', line 80
def message
@options.except(*ENVELOPE_KEYS)
end
|
#message_id ⇒ Object
41
42
43
|
# File 'lib/legion/transport/message.rb', line 41
def message_id
@options[:message_id] || @options[:task_id]
end
|
#persistent ⇒ Object
60
61
62
|
# File 'lib/legion/transport/message.rb', line 60
def persistent
@options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end
|
#priority ⇒ Object
150
151
152
|
# File 'lib/legion/transport/message.rb', line 150
def priority
@options[:priority] || Legion::Transport.settings[:messages][:priority] || 0
end
|
#publish(options = @options) ⇒ Object
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/legion/transport/message.rb', line 13
def publish(options = @options)
raise unless @valid
exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
exchange_dest.publish(encode_message,
routing_key: routing_key || '',
content_type: options[:content_type] || content_type,
content_encoding: options[:content_encoding] || content_encoding,
type: options[:type] || type,
priority: options[:priority] || priority,
expiration: options[:expiration] || expiration,
headers: ,
persistent: persistent,
message_id: message_id,
correlation_id: correlation_id,
app_id: app_id,
timestamp: timestamp)
ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
Legion::Logging.debug "Published to exchange=#{ex_name} routing_key=#{routing_key || ''} class=#{self.class.name}" if defined?(Legion::Logging)
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
Bunny::NetworkErrorWrapper, IOError => e
spool_message(e)
end
|
#reply_to ⇒ Object
50
51
52
|
# File 'lib/legion/transport/message.rb', line 50
def reply_to
@options[:reply_to]
end
|
#routing_key ⇒ Object
84
85
86
|
# File 'lib/legion/transport/message.rb', line 84
def routing_key
@options[:routing_key] if @options.key? :routing_key
end
|
#timestamp ⇒ Object
166
167
168
|
# File 'lib/legion/transport/message.rb', line 166
def timestamp
Time.now.to_i
end
|
#type ⇒ Object
162
163
164
|
# File 'lib/legion/transport/message.rb', line 162
def type
'task'
end
|
#user_id ⇒ Object
46
47
48
|
# File 'lib/legion/transport/message.rb', line 46
def user_id
@options[:user_id] || Legion::Transport.settings[:connection][:user]
end
|
#validate ⇒ Object
170
171
172
|
# File 'lib/legion/transport/message.rb', line 170
def validate
@valid = true
end
|