Module: Legion::Extensions::Actors::AbsorberDispatch

Defined in:
lib/legion/extensions/actors/absorber_dispatch.rb

Class Method Summary collapse

Class Method Details

.dispatch(input:, job_id: nil, context: {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/legion/extensions/actors/absorber_dispatch.rb', line 11

def dispatch(input:, job_id: nil, context: {})
  job_id ||= SecureRandom.hex(8)
  absorber_class = Absorbers::PatternMatcher.resolve(input)

  unless absorber_class
    publish_event("absorb.failed.#{job_id}", job_id: job_id, error: 'no handler found for input')
    return { success: false, error: 'no handler found for input', job_id: job_id }
  end

  absorber = absorber_class.new
  absorber.job_id = job_id
  result = absorber.absorb(url: input, content: context[:content],
                           metadata: context[:metadata] || {}, context: context)
  publish_event("absorb.complete.#{job_id}", job_id: job_id, absorber: absorber_class.name,
                                             result: result)
  { success: true, job_id: job_id, absorber: absorber_class.name, result: result }
rescue StandardError => e
  Legion::Logging.error("AbsorberDispatch failed: #{e.message}") if defined?(Legion::Logging)
  publish_event("absorb.failed.#{job_id}", job_id: job_id, error: e.message)
  { success: false, job_id: job_id, error: e.message }
end

.publish_event(routing_key, **payload) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/legion/extensions/actors/absorber_dispatch.rb', line 33

def publish_event(routing_key, **payload)
  return unless defined?(Legion::Transport)

  session = Legion::Transport.respond_to?(:session) ? Legion::Transport.session : nil
  if session.respond_to?(:open?)
    return unless session.open?
  elsif session.nil?
    return
  end

  message_class =
    if defined?(Legion::Transport::Messages::Dynamic)
      Legion::Transport::Messages::Dynamic
    elsif defined?(Legion::Transport::Message)
      Legion::Transport::Message
    end
  return unless message_class

  message_class.new(routing_key: routing_key, **payload).publish
rescue StandardError => e
  Legion::Logging.warn("AbsorberDispatch publish failed: #{e.message}") if defined?(Legion::Logging)
end