Class: Legion::LLM::Transport::Message
- Inherits:
-
Transport::Message
- Object
- Transport::Message
- Legion::LLM::Transport::Message
show all
- Includes:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/transport/message.rb
Constant Summary
collapse
- LLM_ENVELOPE_KEYS =
Keys stripped from the JSON body (in addition to base ENVELOPE_KEYS). Do NOT add keys already in ENVELOPE_KEYS (:routing_key, :reply_to, etc.). Do NOT add :request_type — metering/audit need it in the body. Do NOT add :message_context — it MUST appear in the body of all 6 messages.
%i[
fleet_correlation_id ttl
].freeze
Instance Method Summary
collapse
Instance Method Details
#app_id ⇒ Object
40
41
42
|
# File 'lib/legion/llm/transport/message.rb', line 40
def app_id
@options[:app_id] || 'legion-llm'
end
|
#content_encoding ⇒ Object
64
65
66
|
# File 'lib/legion/llm/transport/message.rb', line 64
def content_encoding
@options[:content_encoding] || super
end
|
#correlation_id ⇒ Object
Fleet messages use :fleet_correlation_id to avoid collision with the base class’s :correlation_id (which falls through to :parent_id/:task_id).
36
37
38
|
# File 'lib/legion/llm/transport/message.rb', line 36
def correlation_id
@options[:fleet_correlation_id] || super
end
|
#encode_message ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/legion/llm/transport/message.rb', line 48
def encode_message
payload = message
payload = Legion::JSON.dump(payload) unless payload.is_a?(String)
if encrypt? && defined?(Legion::Crypt) && Legion::Crypt.respond_to?(:encrypt)
encrypted = Legion::Crypt.encrypt(payload)
@encrypted_iv = encrypted[:iv]
@options[:content_encoding] = 'encrypted/cs'
log.debug "[llm][transport] encode_message action=encrypt class=#{self.class.name}"
return encrypted[:enciphered_message]
end
@options[:content_encoding] = 'identity'
payload
end
|
44
45
46
|
# File 'lib/legion/llm/transport/message.rb', line 44
def
super.merge().merge().merge().merge()
end
|
#install_return_listener(exchange_dest, options, return_state) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/legion/llm/transport/message.rb', line 88
def install_return_listener(exchange_dest, options, return_state)
return unless options[:mandatory] == true
return_channel = publish_channel(exchange_dest)
return unless return_channel.respond_to?(:on_return)
expected_correlation_id = correlation_id
expected_message_id = message_id
return_channel.on_return do |return_info, properties, _content|
next if properties.respond_to?(:correlation_id) && properties.correlation_id &&
expected_correlation_id && properties.correlation_id != expected_correlation_id
next if properties.respond_to?(:message_id) && properties.message_id &&
expected_message_id && properties.message_id != expected_message_id
return_state[:returned] = true
return_state[:reply_code] = return_info.reply_code if return_info.respond_to?(:reply_code)
return_state[:reply_text] = return_info.reply_text if return_info.respond_to?(:reply_text)
end
end
|
#message ⇒ Object
26
27
28
|
# File 'lib/legion/llm/transport/message.rb', line 26
def message
@options.except(*ENVELOPE_KEYS, *LLM_ENVELOPE_KEYS)
end
|
#message_context ⇒ Object
22
23
24
|
# File 'lib/legion/llm/transport/message.rb', line 22
def message_context
@options[:message_context] || {}
end
|
#message_id ⇒ Object
30
31
32
|
# File 'lib/legion/llm/transport/message.rb', line 30
def message_id
@message_id ||= @options[:message_id] || "#{message_id_prefix}_#{SecureRandom.uuid}"
end
|
#prepare_publisher_confirms(exchange_dest, options) ⇒ Object
108
109
110
111
112
113
|
# File 'lib/legion/llm/transport/message.rb', line 108
def prepare_publisher_confirms(exchange_dest, options)
return unless options[:publisher_confirm] == true
confirm_channel = publish_channel(exchange_dest)
confirm_channel.confirm_select if confirm_channel.respond_to?(:confirm_select)
end
|
#publish_envelope_options(options) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/legion/llm/transport/message.rb', line 68
def publish_envelope_options(options)
{
routing_key: options[:routing_key] || routing_key || '',
content_type: options[:content_type] || content_type,
content_encoding: options[:content_encoding] || content_encoding,
type: options[:type] || type,
priority: options[:priority] || priority,
expiration: options[:expiration] || expiration,
headers: ,
persistent: options.key?(:persistent) ? options[:persistent] : persistent,
message_id: message_id,
correlation_id: correlation_id,
reply_to: reply_to,
app_id: app_id,
timestamp: timestamp
}.tap do |envelope|
envelope[:mandatory] = true if options[:mandatory] == true
end
end
|
#publish_failure_result(status, error, options = @options) ⇒ Object
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/legion/llm/transport/message.rb', line 131
def publish_failure_result(status, error, options = @options)
{
status: status,
accepted: false,
error_class: error.class.name,
error: error.message,
routing_key: options[:routing_key] || routing_key || '',
message_id: message_id,
correlation_id: correlation_id
}
end
|
#publish_result(exchange_dest, options, return_state) ⇒ Object
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/legion/llm/transport/message.rb', line 115
def publish_result(exchange_dest, options, return_state)
status = confirm_publish(exchange_dest, options)
status = :unroutable if return_state[:returned]
ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
{
status: status,
accepted: status == :accepted,
exchange: ex_name,
routing_key: options[:routing_key] || routing_key || '',
message_id: message_id,
return_reply_code: return_state[:reply_code],
return_reply_text: return_state[:reply_text],
correlation_id: correlation_id
}.compact
end
|
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
# File 'lib/legion/llm/transport/message.rb', line 143
def
tracing = @options[:tracing] || context_value(message_context, :tracing)
return {} unless tracing.is_a?(Hash)
trace_id = context_value(tracing, :trace_id)
span_id = context_value(tracing, :span_id)
parent_span_id = context_value(tracing, :parent_span_id)
correlation_id = context_value(tracing, :correlation_id)
baggage = (context_value(tracing, :baggage))
h = {}
h['traceparent'] = "00-#{trace_id}-#{span_id}-01" if w3c_trace_id?(trace_id) && w3c_span_id?(span_id)
h['baggage'] = baggage if baggage
h['x-legion-trace-id'] = trace_id.to_s if trace_id
h['x-legion-span-id'] = span_id.to_s if span_id
h['x-legion-parent-span-id'] = parent_span_id.to_s if parent_span_id
h['x-legion-correlation-id'] = correlation_id.to_s if correlation_id
h
end
|