Module: Legion::Ingress
- Defined in:
- lib/legion/ingress.rb
Defined Under Namespace
Classes: InvalidFunction, InvalidRunnerClass, PayloadTooLarge
Constant Summary collapse
- MAX_PAYLOAD_SIZE =
512KB serialized
524_288- RUNNER_CLASS_PATTERN =
/\A[A-Z][A-Za-z0-9:]+\z/- FUNCTION_PATTERN =
/\A[a-z_][a-z0-9_]*[!?]?\z/
Class Method Summary collapse
- .local_runner?(runner_class) ⇒ Boolean
-
.normalize(payload:, runner_class: nil, function: nil, source: 'unknown', **opts) ⇒ Hash
Normalize a payload from any source into a runner-compatible message hash.
-
.run(payload:, runner_class: nil, function: nil, source: 'unknown', principal: nil, **opts) ⇒ Object
Normalize and execute via Legion::Runner.run.
Class Method Details
.local_runner?(runner_class) ⇒ Boolean
127 128 129 130 131 132 133 134 |
# File 'lib/legion/ingress.rb', line 127 def local_runner?(runner_class) return false unless defined?(Legion::Extensions) && Legion::Extensions.local_tasks.is_a?(Array) klass = runner_class.is_a?(String) ? Kernel.const_get(runner_class) : runner_class Legion::Extensions.local_tasks.any? { |t| t[:runner_module] == klass } rescue NameError false end |
.normalize(payload:, runner_class: nil, function: nil, source: 'unknown', **opts) ⇒ Hash
Normalize a payload from any source into a runner-compatible message hash. This is the universal entry point — AMQP subscriptions, HTTP webhooks, CLI commands, and API endpoints all feed through here.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/legion/ingress.rb', line 24 def normalize(payload:, runner_class: nil, function: nil, source: 'unknown', **opts) = parse_payload(payload) if .is_a?(Hash) && defined?(Legion::JSON) serialized_size = Legion::JSON.dump().bytesize raise PayloadTooLarge, "payload exceeds #{MAX_PAYLOAD_SIZE} bytes" if serialized_size > MAX_PAYLOAD_SIZE end [:runner_class] = runner_class || [:runner_class] [:function] = function || [:function] [:source] = source [:timestamp] ||= Time.now.to_i [:datetime] ||= Time.at([:timestamp]).to_datetime.to_s .merge(opts) end |
.run(payload:, runner_class: nil, function: nil, source: 'unknown', principal: nil, **opts) ⇒ Object
Normalize and execute via Legion::Runner.run. Returns the runner result hash.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/legion/ingress.rb', line 42 def run(payload:, runner_class: nil, function: nil, source: 'unknown', principal: nil, **opts) # rubocop:disable Metrics/ParameterLists,Metrics/MethodLength,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity,Metrics/AbcSize Legion::Logging.info "[Ingress] run: source=#{source} runner_class=#{runner_class} function=#{function}" if defined?(Legion::Logging) check_subtask = opts.fetch(:check_subtask, true) generate_task = opts.fetch(:generate_task, true) = normalize(payload: payload, runner_class: runner_class, function: function, source: source, **opts.except(:check_subtask, :generate_task, :principal)) Legion::Logging.debug "[Ingress] payload keys: #{.keys}" if defined?(Legion::Logging) rc = .delete(:runner_class) fn = .delete(:function) if rc.nil? Legion::Logging.warn '[Ingress] runner_class is missing' if defined?(Legion::Logging) raise 'runner_class is required' end raise 'function is required' if fn.nil? rc_str = rc.to_s raise InvalidRunnerClass, "invalid runner_class format: #{rc_str}" unless rc_str.match?(RUNNER_CLASS_PATTERN) fn_str = fn.to_s raise InvalidFunction, "invalid function format: #{fn_str}" unless fn_str.match?(FUNCTION_PATTERN) # RAI invariant #2: registration precedes permission if defined?(Legion::DigitalWorker::Registry) && [:worker_id] Legion::DigitalWorker::Registry.validate_execution!( worker_id: [:worker_id], required_consent: [:required_consent] ) end if defined?(Legion::Rbac) principal ||= Legion::Rbac::Principal.local_admin Legion::Rbac.(principal: principal, runner_class: rc.to_s, function: fn.to_s) end Legion::Events.emit('ingress.received', runner_class: rc.to_s, function: fn, source: source) if local_runner?(rc) Legion::Logging.debug "[Ingress] local short-circuit: #{rc}.#{fn}" if defined?(Legion::Logging) klass = rc.is_a?(String) ? Kernel.const_get(rc) : rc ctx = .merge(runner_class: rc.to_s, function: fn.to_s) return Legion::Context.with_task_context(ctx) { klass.send(fn.to_sym, **) } end runner_block = lambda { ctx = .merge(runner_class: rc.to_s, function: fn.to_s) Legion::Context.with_task_context(ctx) do Legion::Runner.run( runner_class: rc, function: fn, check_subtask: check_subtask, generate_task: generate_task, ** ) end } if defined?(Legion::Telemetry::OpenInference) Legion::Telemetry::OpenInference.tool_span(name: "#{rc}.#{fn}", parameters: ) { |_span| runner_block.call } else runner_block.call end rescue PayloadTooLarge => e Legion::Logging.error "[Ingress] payload_too_large: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'payload_too_large', message: e. } } rescue InvalidRunnerClass => e Legion::Logging.error "[Ingress] invalid_runner_class: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'invalid_runner_class', message: e. } } rescue InvalidFunction => e Legion::Logging.error "[Ingress] invalid_function: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'invalid_function', message: e. } } rescue Legion::DigitalWorker::Registry::WorkerNotFound => e Legion::Logging.error "[Ingress] worker_not_found: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'worker_not_found', message: e. } } rescue Legion::DigitalWorker::Registry::WorkerNotActive => e Legion::Logging.error "[Ingress] worker_not_active: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'worker_not_active', message: e. } } rescue Legion::DigitalWorker::Registry::InsufficientConsent => e Legion::Logging.error "[Ingress] insufficient_consent: #{e.}" if defined?(Legion::Logging) { success: false, status: 'task.blocked', error: { code: 'insufficient_consent', message: e. } } end |