Module: Legion::Extensions::Actors::AbsorberDispatch
- Defined in:
- lib/legion/extensions/actors/absorber_dispatch.rb
Class Method Summary collapse
- .dispatch(input:, job_id: nil, context: {}) ⇒ Object
- .publish_event(routing_key, **payload) ⇒ Object
Class Method Details
.dispatch(input:, job_id: nil, context: {}) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/legion/extensions/actors/absorber_dispatch.rb', line 11 def dispatch(input:, job_id: nil, context: {}) job_id ||= SecureRandom.hex(8) absorber_class = Absorbers::PatternMatcher.resolve(input) unless absorber_class publish_event("absorb.failed.#{job_id}", job_id: job_id, error: 'no handler found for input') return { success: false, error: 'no handler found for input', job_id: job_id } end absorber = absorber_class.new absorber.job_id = job_id result = absorber.absorb(url: input, content: context[:content], metadata: context[:metadata] || {}, context: context) publish_event("absorb.complete.#{job_id}", job_id: job_id, absorber: absorber_class.name, result: result) { success: true, job_id: job_id, absorber: absorber_class.name, result: result } rescue StandardError => e Legion::Logging.error("AbsorberDispatch failed: #{e.}") if defined?(Legion::Logging) publish_event("absorb.failed.#{job_id}", job_id: job_id, error: e.) { success: false, job_id: job_id, error: e. } end |
.publish_event(routing_key, **payload) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/legion/extensions/actors/absorber_dispatch.rb', line 33 def publish_event(routing_key, **payload) return unless defined?(Legion::Transport) session = Legion::Transport.respond_to?(:session) ? Legion::Transport.session : nil if session.respond_to?(:open?) return unless session.open? elsif session.nil? return end = if defined?(Legion::Transport::Messages::Dynamic) Legion::Transport::Messages::Dynamic elsif defined?(Legion::Transport::Message) Legion::Transport::Message end return unless .new(routing_key: routing_key, **payload).publish rescue StandardError => e Legion::Logging.warn("AbsorberDispatch publish failed: #{e.}") if defined?(Legion::Logging) end |