Module: Legion::API::Routes::LexDispatch
- Defined in:
- lib/legion/api/lex_dispatch.rb
Class Method Summary collapse
- .build_envelope(request) ⇒ Object
-
.definition_blocks_remote?(entry) ⇒ Boolean
Returns true when the definition-level flag explicitly disables remote dispatch.
-
.dispatch_async_amqp(exchange_name, routing_key, payload, envelope) ⇒ Object
Publish an async AMQP message for remote dispatch (fire-and-forget).
- .dispatch_hook(context, request, entry, payload, envelope) ⇒ Object
-
.dispatch_request(context, request, params) ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/AbcSize,Metrics/CyclomaticComplexity.
-
.extension_loaded_locally?(entry) ⇒ Boolean
Returns true when the runner class referenced by the route entry is available in the current process (i.e. the extension is loaded locally).
- .hook_base_subclass?(runner_class) ⇒ Boolean
- .parse_header_integer(value) ⇒ Object
-
.register_discovery(app) ⇒ Object
Discovery endpoints (GET).
-
.register_dispatch(app) ⇒ Object
Dispatch endpoint (POST).
- .registered(app) ⇒ Object
Class Method Details
.build_envelope(request) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/legion/api/lex_dispatch.rb', line 174 def self.build_envelope(request) task_id = parse_header_integer(request.env['HTTP_X_LEGION_TASK_ID']) conversation_id = request.env['HTTP_X_LEGION_CONVERSATION_ID'] || ::SecureRandom.uuid parent_id = parse_header_integer(request.env['HTTP_X_LEGION_PARENT_ID']) master_id = parse_header_integer(request.env['HTTP_X_LEGION_MASTER_ID']) chain_id = parse_header_integer(request.env['HTTP_X_LEGION_CHAIN_ID']) debug = request.env['HTTP_X_LEGION_DEBUG'] == 'true' { task_id: task_id, conversation_id: conversation_id, parent_id: parent_id, master_id: master_id || task_id, chain_id: chain_id, debug: debug }.compact end |
.definition_blocks_remote?(entry) ⇒ Boolean
Returns true when the definition-level flag explicitly disables remote dispatch. Extension-level gate (entry module) takes precedence over definition flag.
209 210 211 212 213 214 |
# File 'lib/legion/api/lex_dispatch.rb', line 209 def self.definition_blocks_remote?(entry) defn = entry[:definition] return false if defn.nil? defn[:remote_invocable] == false end |
.dispatch_async_amqp(exchange_name, routing_key, payload, envelope) ⇒ Object
Publish an async AMQP message for remote dispatch (fire-and-forget).
217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/legion/api/lex_dispatch.rb', line 217 def self.dispatch_async_amqp(exchange_name, routing_key, payload, envelope) return unless defined?(Legion::Transport) && Legion::Transport.respond_to?(:connected?) && Legion::Transport.connected? channel = Legion::Transport.channel exchange = channel.exchange(exchange_name, type: :topic, durable: true, passive: true) = Legion::JSON.dump(payload.merge(envelope)) exchange.publish(, routing_key: routing_key, content_type: 'application/json', persistent: true) rescue StandardError => e Legion::Logging.warn "[LexDispatch] async AMQP publish failed: #{e.}" if defined?(Legion::Logging) raise end |
.dispatch_hook(context, request, entry, payload, envelope) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/legion/api/lex_dispatch.rb', line 241 def self.dispatch_hook(context, request, entry, payload, envelope) hook = entry[:runner_class].new # Re-read body for verification (request body was already read for payload parsing) request.body.rewind body_for_verify = request.body.read request.body.rewind unless hook.verify(request.env, body_for_verify) context.halt 401, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 401, message: 'hook verification failed' } }) end function = hook.route(request.env, payload) unless function context.halt 422, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 422, message: 'hook could not route this event' } }) end # If the hook defines the routed function as an instance method, call it to transform if hook.class.method_defined?(function) && hook.class.instance_method(function).owner != Legion::Extensions::Hooks::Base transformed = hook.send(function, payload) payload = transformed if transformed end runner = hook.runner_class || entry[:runner_class] result = Legion::Ingress.run( payload: payload.merge(envelope.slice(:task_id, :conversation_id, :parent_id, :master_id, :chain_id)), runner_class: runner, function: function, source: 'hook', check_subtask: true, generate_task: true ) response_body = envelope.merge( status: result[:status], result: result[:result] ).compact Legion::JSON.dump(response_body) end |
.dispatch_request(context, request, params) ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/AbcSize,Metrics/CyclomaticComplexity
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/legion/api/lex_dispatch.rb', line 67 def self.dispatch_request(context, request, params) # rubocop:disable Metrics/MethodLength,Metrics/AbcSize,Metrics/CyclomaticComplexity content_type = 'application/json' context.content_type content_type entry = Legion::API.router.find_extension_route( params[:lex_name], params[:component_type], params[:component_name], params[:method_name] ) unless entry route_key = "#{params[:lex_name]}/#{params[:component_type]}/#{params[:component_name]}/#{params[:method_name]}" context.halt 404, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 404, message: "no route registered for '#{route_key}'" } }) end envelope = build_envelope(request) payload = begin body = request.body.read body.nil? || body.empty? ? {} : Legion::JSON.load(body) rescue StandardError => e Legion::Logging.warn "[LexDispatch] invalid JSON body: #{e.}" if defined?(Legion::Logging) context.halt 400, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 400, message: 'request body is not valid JSON' } }) end # Remote dispatch: when the runner class is not loaded locally, forward via AMQP unless extension_loaded_locally?(entry) if definition_blocks_remote?(entry) context.halt 403, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 403, message: 'Method not remotely invocable' } }) end exchange_name = entry[:amqp_prefix].to_s.then { |p| p.empty? ? "lex.#{entry[:lex_name]}" : p } routing_key = "#{exchange_name}.#{entry[:component_type]}.#{entry[:component_name]}.#{entry[:method_name]}" if request.env['HTTP_X_LEGION_SYNC'] == 'true' result = Legion::API::SyncDispatch.dispatch(exchange_name, routing_key, payload, envelope) return Legion::JSON.dump(result) else unless defined?(Legion::Transport) && Legion::Transport.respond_to?(:connected?) && Legion::Transport.connected? context.halt 503, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 503, message: 'Transport not available' } }) end dispatch_async_amqp(exchange_name, routing_key, payload, envelope) context.status 202 return Legion::JSON.dump(envelope.merge(status: 'queued')) end end # Hook-aware dispatch: when component_type is 'hooks' and the runner class # is a Hooks::Base subclass, apply verify -> route -> transform -> Ingress. return dispatch_hook(context, request, entry, payload, envelope) if entry[:component_type] == 'hooks' && hook_base_subclass?(entry[:runner_class]) result = Legion::Ingress.run( payload: payload.merge(envelope.slice(:task_id, :conversation_id, :parent_id, :master_id, :chain_id)), runner_class: entry[:runner_class], function: entry[:method_name].to_sym, source: 'lex_dispatch', generate_task: true ) response_body = envelope.merge( status: result[:status], result: result[:result] ).compact Legion::JSON.dump(response_body) rescue StandardError => e route_key = "#{params[:lex_name]}/#{params[:component_type]}/#{params[:component_name]}/#{params[:method_name]}" Legion::Logging.log_exception(e, payload_summary: "LexDispatch POST #{route_key}", component_type: :api) context.status 500 Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 500, message: e. } }) end |
.extension_loaded_locally?(entry) ⇒ Boolean
Returns true when the runner class referenced by the route entry is available in the current process (i.e. the extension is loaded locally).
194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/legion/api/lex_dispatch.rb', line 194 def self.extension_loaded_locally?(entry) runner_class = entry[:runner_class] return false if runner_class.nil? || runner_class.to_s.empty? # Try constant lookup — safe because runner_class is from the route registry, # not from user input. parts = runner_class.to_s.split('::').reject(&:empty?) parts.reduce(Object) { |mod, name| mod.const_get(name, false) } true rescue NameError, TypeError false end |
.hook_base_subclass?(runner_class) ⇒ Boolean
231 232 233 234 235 236 237 238 239 |
# File 'lib/legion/api/lex_dispatch.rb', line 231 def self.hook_base_subclass?(runner_class) return false unless defined?(Legion::Extensions::Hooks::Base) return false if runner_class.nil? klass = runner_class.is_a?(Class) ? runner_class : Kernel.const_get(runner_class.to_s) klass < Legion::Extensions::Hooks::Base rescue NameError, TypeError false end |
.parse_header_integer(value) ⇒ Object
166 167 168 169 170 171 172 |
# File 'lib/legion/api/lex_dispatch.rb', line 166 def self.parse_header_integer(value) return nil if value.nil? Integer(value) rescue ArgumentError, TypeError nil end |
.register_discovery(app) ⇒ Object
Discovery endpoints (GET)
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/legion/api/lex_dispatch.rb', line 15 def self.register_discovery(app) # GET /api/extensions/index — list all extensions app.get '/api/extensions/index' do content_type :json names = Legion::API.router.extension_names Legion::JSON.dump({ extensions: names }) end # GET /api/extensions/:lex_name/:component_type/:component_name/:method_name — full contract app.get '/api/extensions/:lex_name/:component_type/:component_name/:method_name' do content_type :json entry = Legion::API.router.find_extension_route( params[:lex_name], params[:component_type], params[:component_name], params[:method_name] ) unless entry halt 404, Legion::JSON.dump({ task_id: nil, conversation_id: nil, status: 'failed', error: { code: 404, message: 'route not found' } }) end amqp_pfx = entry[:amqp_prefix].to_s.then { |p| p.empty? ? "lex.#{params[:lex_name]}" : p } response = { extension: params[:lex_name], component_type: params[:component_type], component: params[:component_name], method: params[:method_name], definition: entry[:definition], amqp: { exchange: amqp_pfx, routing_key: "#{amqp_pfx}.#{params[:component_type]}.#{params[:component_name]}.#{params[:method_name]}" } } if params[:component_type] == 'hooks' response[:hook_endpoint] = "/api/extensions/#{params[:lex_name]}/hooks/#{params[:component_name]}/#{params[:method_name]}" end Legion::JSON.dump(response) end end |
.register_dispatch(app) ⇒ Object
Dispatch endpoint (POST)
60 61 62 63 64 65 |
# File 'lib/legion/api/lex_dispatch.rb', line 60 def self.register_dispatch(app) dispatcher = method(:dispatch_request) app.post '/api/extensions/:lex_name/:component_type/:component_name/:method_name' do dispatcher.call(self, request, params) end end |
.registered(app) ⇒ Object
9 10 11 12 |
# File 'lib/legion/api/lex_dispatch.rb', line 9 def self.registered(app) register_discovery(app) register_dispatch(app) end |