Class: Legion::LLM::Transport::Message

Inherits:
Transport::Message
  • Object
show all
Includes:
Legion::Logging::Helper
Defined in:
lib/legion/llm/transport/message.rb

Constant Summary collapse

LLM_ENVELOPE_KEYS =

Keys stripped from the JSON body (in addition to base ENVELOPE_KEYS). Do NOT add keys already in ENVELOPE_KEYS (:routing_key, :reply_to, etc.). Do NOT add :request_type — metering/audit need it in the body. Do NOT add :message_context — it MUST appear in the body of all 6 messages.

%i[
  fleet_correlation_id ttl
].freeze

Instance Method Summary collapse

Instance Method Details

#app_idObject



40
41
42
# File 'lib/legion/llm/transport/message.rb', line 40

def app_id
  @options[:app_id] || 'legion-llm'
end

#content_encodingObject



64
65
66
# File 'lib/legion/llm/transport/message.rb', line 64

def content_encoding
  @options[:content_encoding] || super
end

#correlation_idObject

Fleet messages use :fleet_correlation_id to avoid collision with the base class’s :correlation_id (which falls through to :parent_id/:task_id).



36
37
38
# File 'lib/legion/llm/transport/message.rb', line 36

def correlation_id
  @options[:fleet_correlation_id] || super
end

#encode_messageObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/legion/llm/transport/message.rb', line 48

def encode_message
  payload = message
  payload = Legion::JSON.dump(payload) unless payload.is_a?(String)

  if encrypt? && defined?(Legion::Crypt) && Legion::Crypt.respond_to?(:encrypt)
    encrypted = Legion::Crypt.encrypt(payload)
    @encrypted_iv = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    log.debug "[llm][transport] encode_message action=encrypt class=#{self.class.name}"
    return encrypted[:enciphered_message]
  end

  @options[:content_encoding] = 'identity'
  payload
end

#headersObject



44
45
46
# File 'lib/legion/llm/transport/message.rb', line 44

def headers
  super.merge(llm_headers).merge(context_headers).merge(tracing_headers).merge(encryption_headers)
end

#install_return_listener(exchange_dest, options, return_state) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/legion/llm/transport/message.rb', line 88

def install_return_listener(exchange_dest, options, return_state)
  return unless options[:mandatory] == true

  return_channel = publish_channel(exchange_dest)
  return unless return_channel.respond_to?(:on_return)

  expected_correlation_id = correlation_id
  expected_message_id = message_id
  return_channel.on_return do |return_info, properties, _content|
    next if properties.respond_to?(:correlation_id) && properties.correlation_id &&
            expected_correlation_id && properties.correlation_id != expected_correlation_id
    next if properties.respond_to?(:message_id) && properties.message_id &&
            expected_message_id && properties.message_id != expected_message_id

    return_state[:returned] = true
    return_state[:reply_code] = return_info.reply_code if return_info.respond_to?(:reply_code)
    return_state[:reply_text] = return_info.reply_text if return_info.respond_to?(:reply_text)
  end
end

#messageObject



26
27
28
# File 'lib/legion/llm/transport/message.rb', line 26

def message
  @options.except(*ENVELOPE_KEYS, *LLM_ENVELOPE_KEYS)
end

#message_contextObject



22
23
24
# File 'lib/legion/llm/transport/message.rb', line 22

def message_context
  @options[:message_context] || {}
end

#message_idObject



30
31
32
# File 'lib/legion/llm/transport/message.rb', line 30

def message_id
  @message_id ||= @options[:message_id] || "#{message_id_prefix}_#{SecureRandom.uuid}"
end

#prepare_publisher_confirms(exchange_dest, options) ⇒ Object



108
109
110
111
112
113
# File 'lib/legion/llm/transport/message.rb', line 108

def prepare_publisher_confirms(exchange_dest, options)
  return unless options[:publisher_confirm] == true

  confirm_channel = publish_channel(exchange_dest)
  confirm_channel.confirm_select if confirm_channel.respond_to?(:confirm_select)
end

#publish_envelope_options(options) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/legion/llm/transport/message.rb', line 68

def publish_envelope_options(options)
  {
    routing_key:      options[:routing_key] || routing_key || '',
    content_type:     options[:content_type] || content_type,
    content_encoding: options[:content_encoding] || content_encoding,
    type:             options[:type] || type,
    priority:         options[:priority] || priority,
    expiration:       options[:expiration] || expiration,
    headers:          headers,
    persistent:       options.key?(:persistent) ? options[:persistent] : persistent,
    message_id:       message_id,
    correlation_id:   correlation_id,
    reply_to:         reply_to,
    app_id:           app_id,
    timestamp:        timestamp
  }.tap do |envelope|
    envelope[:mandatory] = true if options[:mandatory] == true
  end
end

#publish_failure_result(status, error, options = @options) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/legion/llm/transport/message.rb', line 131

def publish_failure_result(status, error, options = @options)
  {
    status:         status,
    accepted:       false,
    error_class:    error.class.name,
    error:          error.message,
    routing_key:    options[:routing_key] || routing_key || '',
    message_id:     message_id,
    correlation_id: correlation_id
  }
end

#publish_result(exchange_dest, options, return_state) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/legion/llm/transport/message.rb', line 115

def publish_result(exchange_dest, options, return_state)
  status = confirm_publish(exchange_dest, options)
  status = :unroutable if return_state[:returned]
  ex_name = exchange_dest.respond_to?(:name) ? exchange_dest.name : exchange_dest.to_s
  {
    status:            status,
    accepted:          status == :accepted,
    exchange:          ex_name,
    routing_key:       options[:routing_key] || routing_key || '',
    message_id:        message_id,
    return_reply_code: return_state[:reply_code],
    return_reply_text: return_state[:reply_text],
    correlation_id:    correlation_id
  }.compact
end

#tracing_headersObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/legion/llm/transport/message.rb', line 143

def tracing_headers
  tracing = @options[:tracing] || context_value(message_context, :tracing)
  return {} unless tracing.is_a?(Hash)

  trace_id = context_value(tracing, :trace_id)
  span_id = context_value(tracing, :span_id)
  parent_span_id = context_value(tracing, :parent_span_id)
  correlation_id = context_value(tracing, :correlation_id)
  baggage = baggage_header(context_value(tracing, :baggage))

  h = {}
  h['traceparent'] = "00-#{trace_id}-#{span_id}-01" if w3c_trace_id?(trace_id) && w3c_span_id?(span_id)
  h['baggage'] = baggage if baggage
  h['x-legion-trace-id']       = trace_id.to_s       if trace_id
  h['x-legion-span-id']        = span_id.to_s        if span_id
  h['x-legion-parent-span-id'] = parent_span_id.to_s if parent_span_id
  h['x-legion-correlation-id'] = correlation_id.to_s if correlation_id
  h
end