Module: Legion::Extensions::Absorbers::Dispatch

Defined in:
lib/legion/extensions/absorbers/dispatch.rb

Class Method Summary collapse

Class Method Details

.default_contextObject



64
65
66
67
68
69
70
71
72
73
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 64

def default_context
  {
    depth:            0,
    max_depth:        max_depth_setting,
    ancestor_chain:   [],
    conversation_id:  nil,
    requested_by:     nil,
    parent_absorb_id: nil
  }
end

.dispatch(input, context: {}) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 16

def dispatch(input, context: {})
  context = default_context.merge(context)

  return { status: :depth_exceeded, input: input } if context[:depth] >= context[:max_depth]

  source_key = normalize_source_key(input)
  return { status: :cycle_detected, input: input } if context[:ancestor_chain]&.any? { |a| a.include?(source_key) }

  absorber_class = PatternMatcher.resolve(input)
  return nil unless absorber_class

  absorb_id = "absorb:#{SecureRandom.uuid}"

  record = {
    absorb_id:      absorb_id,
    input:          input,
    absorber_class: absorber_class.name,
    context:        context.merge(
      ancestor_chain: (context[:ancestor_chain] || []) + [absorb_id]
    ),
    status:         :dispatched,
    dispatched_at:  Time.now.utc.iso8601
  }

  publish_to_transport(absorber_class, input, record) if transport_available?

  @mutex.synchronize { @dispatched << record }
  record
end

.dispatch_children(children, parent_context:) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 46

def dispatch_children(children, parent_context:)
  children.map do |child|
    child_context = parent_context.merge(
      depth:            parent_context[:depth] + 1,
      parent_absorb_id: parent_context[:absorb_id]
    )
    dispatch(child[:url] || child[:file_path], context: child_context)
  end
end

.dispatchedObject



56
57
58
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 56

def dispatched
  @mutex.synchronize { @dispatched.dup }
end

.extract_urls(text) ⇒ Object



96
97
98
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 96

def extract_urls(text)
  URI.extract(text, %w[http https]).uniq
end

.max_depth_settingObject



75
76
77
78
79
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 75

def max_depth_setting
  return 5 unless defined?(Legion::Settings)

  Legion::Settings[:absorbers]&.dig(:max_depth) || 5
end

.normalize_source_key(input) ⇒ Object



81
82
83
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 81

def normalize_source_key(input)
  input.to_s.gsub(%r{^https?://}, '').gsub(/[?#].*/, '')
end

.publish_to_transport(absorber_class, _input, record) ⇒ Object



91
92
93
94
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 91

def publish_to_transport(absorber_class, _input, record)
  require_relative 'transport'
  Transport.publish_absorb_request(absorber_class: absorber_class, record: record)
end

.reset_dispatched!Object



60
61
62
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 60

def reset_dispatched!
  @mutex.synchronize { @dispatched.clear }
end

.transport_available?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 85

def transport_available?
  defined?(Legion::Transport) &&
    Legion::Transport.respond_to?(:connected?) &&
    Legion::Transport.connected?
end