Class: Legion::Transport::Message

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/message.rb

Constant Summary collapse

ENVELOPE_KEYS =
%i[
  headers content_type content_encoding persistent expiration
  priority app_id user_id reply_to correlation_id message_id
  routing_key exchange type
].freeze

Instance Method Summary collapse

Methods included from Common

#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(**options) ⇒ Message

Returns a new instance of Message.



8
9
10
11
# File 'lib/legion/transport/message.rb', line 8

def initialize(**options)
  @options = options
  validate
end

Instance Method Details

#app_idObject



37
38
39
# File 'lib/legion/transport/message.rb', line 37

def app_id
  @options[:app_id] || 'legion'
end

#channelObject



174
175
176
# File 'lib/legion/transport/message.rb', line 174

def channel
  Legion::Transport::Connection.channel
end

#content_encodingObject



158
159
160
# File 'lib/legion/transport/message.rb', line 158

def content_encoding
  'identity'
end

#content_typeObject



154
155
156
# File 'lib/legion/transport/message.rb', line 154

def content_type
  'application/json'
end

#correlation_idObject

ID of the message that this message is a reply to. Links subtasks back to the parent task.



56
57
58
# File 'lib/legion/transport/message.rb', line 56

def correlation_id
  @options[:correlation_id] || @options[:parent_id] || @options[:task_id]
end

#encode_messageObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/legion/transport/message.rb', line 88

def encode_message
  message_payload = message
  message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String

  if encrypt?
    encrypted = Legion::Crypt.encrypt(message_payload)
    headers[:iv] = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    Legion::Logging.debug "Message encrypted with content_encoding=encrypted/cs class=#{self.class.name}" if defined?(Legion::Logging)
    return encrypted[:enciphered_message]
  else
    @options[:content_encoding] = 'identity'
  end

  message_payload
end

#encrypt?Boolean

Returns:

  • (Boolean)


109
110
111
112
113
114
115
116
# File 'lib/legion/transport/message.rb', line 109

def encrypt?
  should_encrypt = if @options.key?(:encrypt)
                     @options[:encrypt]
                   else
                     Legion::Settings[:transport][:messages][:encrypt]
                   end
  should_encrypt && Legion::Settings[:crypt][:cs_encrypt_ready]
end

#encrypt_message(message, _type = 'cs') ⇒ Object



105
106
107
# File 'lib/legion/transport/message.rb', line 105

def encrypt_message(message, _type = 'cs')
  Legion::Crypt.encrypt(message)
end

#exchangeObject



123
124
125
# File 'lib/legion/transport/message.rb', line 123

def exchange
  Kernel.const_get(exchange_name)
end

#exchange_nameObject



118
119
120
121
# File 'lib/legion/transport/message.rb', line 118

def exchange_name
  lex = self.class.ancestors.first.to_s.split('::')[2].downcase
  "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end

#expirationObject



64
65
66
67
68
69
70
71
72
# File 'lib/legion/transport/message.rb', line 64

def expiration
  if @options.key? :expiration
    @options[:expiration]
  elsif @options.key? :ttl
    @options[:ttl]
  elsif Legion::Transport.settings[:messages].key? :expiration
    Legion::Transport.settings[:messages][:expiration]
  end
end

#headersObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/legion/transport/message.rb', line 127

def headers
  @options[:headers] ||= Concurrent::Hash.new
  @options[:headers]['legion_protocol_version'] ||= '2.0'
  inject_region_header
  inject_legion_region_header
  %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function
     chain_id debug].each do |header|
    next unless @options.key? header

    value = @options[header]
    @options[:headers][header] = case value
                                 when Integer, Float, TrueClass, FalseClass
                                   value
                                 else
                                   value.to_s
                                 end
  end
  @options[:headers]
rescue StandardError => e
  Legion::Transport.logger.error e.message
  Legion::Transport.logger.error e.backtrace
end

#messageObject



80
81
82
# File 'lib/legion/transport/message.rb', line 80

def message
  @options.except(*ENVELOPE_KEYS)
end

#message_idObject



41
42
43
# File 'lib/legion/transport/message.rb', line 41

def message_id
  @options[:message_id] || @options[:task_id]
end

#persistentObject



60
61
62
# File 'lib/legion/transport/message.rb', line 60

def persistent
  @options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end

#priorityObject



150
151
152
# File 'lib/legion/transport/message.rb', line 150

def priority
  @options[:priority] || Legion::Transport.settings[:messages][:priority] || 0
end

#publish(options = @options) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/transport/message.rb', line 13

def publish(options = @options)
  raise unless @valid

  exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
  exchange_dest.publish(encode_message,
                        routing_key:      routing_key || '',
                        content_type:     options[:content_type] || content_type,
                        content_encoding: options[:content_encoding] || content_encoding,
                        type:             options[:type] || type,
                        priority:         options[:priority] || priority,
                        expiration:       options[:expiration] || expiration,
                        headers:          headers,
                        persistent:       persistent,
                        message_id:       message_id,
                        correlation_id:   correlation_id,
                        app_id:           app_id,
                        timestamp:        timestamp)
  ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
  Legion::Logging.debug "Published to exchange=#{ex_name} routing_key=#{routing_key || ''} class=#{self.class.name}" if defined?(Legion::Logging)
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
       Bunny::NetworkErrorWrapper, IOError => e
  spool_message(e)
end

#reply_toObject



50
51
52
# File 'lib/legion/transport/message.rb', line 50

def reply_to
  @options[:reply_to]
end

#routing_keyObject



84
85
86
# File 'lib/legion/transport/message.rb', line 84

def routing_key
  @options[:routing_key] if @options.key? :routing_key
end

#timestampObject



166
167
168
# File 'lib/legion/transport/message.rb', line 166

def timestamp
  Time.now.to_i
end

#typeObject



162
163
164
# File 'lib/legion/transport/message.rb', line 162

def type
  'task'
end

#user_idObject

user_id Sender’s identifier. www.rabbitmq.com/extensions.html#validated-user-id



46
47
48
# File 'lib/legion/transport/message.rb', line 46

def user_id
  @options[:user_id] || Legion::Transport.settings[:connection][:user]
end

#validateObject



170
171
172
# File 'lib/legion/transport/message.rb', line 170

def validate
  @valid = true
end