Module: Legion::Extensions::Absorbers::Dispatch
- Defined in:
- lib/legion/extensions/absorbers/dispatch.rb
Class Method Summary collapse
- .default_context ⇒ Object
- .dispatch(input, context: {}) ⇒ Object
- .dispatch_children(children, parent_context:) ⇒ Object
- .dispatched ⇒ Object
- .extract_urls(text) ⇒ Object
- .max_depth_setting ⇒ Object
- .normalize_source_key(input) ⇒ Object
- .publish_to_transport(absorber_class, _input, record) ⇒ Object
- .reset_dispatched! ⇒ Object
- .transport_available? ⇒ Boolean
Class Method Details
.default_context ⇒ Object
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 64 def default_context { depth: 0, max_depth: max_depth_setting, ancestor_chain: [], conversation_id: nil, requested_by: nil, parent_absorb_id: nil } end |
.dispatch(input, context: {}) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 16 def dispatch(input, context: {}) context = default_context.merge(context) return { status: :depth_exceeded, input: input } if context[:depth] >= context[:max_depth] source_key = normalize_source_key(input) return { status: :cycle_detected, input: input } if context[:ancestor_chain]&.any? { |a| a.include?(source_key) } absorber_class = PatternMatcher.resolve(input) return nil unless absorber_class absorb_id = "absorb:#{SecureRandom.uuid}" record = { absorb_id: absorb_id, input: input, absorber_class: absorber_class.name, context: context.merge( ancestor_chain: (context[:ancestor_chain] || []) + [absorb_id] ), status: :dispatched, dispatched_at: Time.now.utc.iso8601 } publish_to_transport(absorber_class, input, record) if transport_available? @mutex.synchronize { @dispatched << record } record end |
.dispatch_children(children, parent_context:) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 46 def dispatch_children(children, parent_context:) children.map do |child| child_context = parent_context.merge( depth: parent_context[:depth] + 1, parent_absorb_id: parent_context[:absorb_id] ) dispatch(child[:url] || child[:file_path], context: child_context) end end |
.dispatched ⇒ Object
56 57 58 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 56 def dispatched @mutex.synchronize { @dispatched.dup } end |
.extract_urls(text) ⇒ Object
96 97 98 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 96 def extract_urls(text) URI.extract(text, %w[http https]).uniq end |
.max_depth_setting ⇒ Object
75 76 77 78 79 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 75 def max_depth_setting return 5 unless defined?(Legion::Settings) Legion::Settings[:absorbers]&.dig(:max_depth) || 5 end |
.normalize_source_key(input) ⇒ Object
81 82 83 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 81 def normalize_source_key(input) input.to_s.gsub(%r{^https?://}, '').gsub(/[?#].*/, '') end |
.publish_to_transport(absorber_class, _input, record) ⇒ Object
91 92 93 94 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 91 def publish_to_transport(absorber_class, _input, record) require_relative 'transport' Transport.publish_absorb_request(absorber_class: absorber_class, record: record) end |
.reset_dispatched! ⇒ Object
60 61 62 |
# File 'lib/legion/extensions/absorbers/dispatch.rb', line 60 def reset_dispatched! @mutex.synchronize { @dispatched.clear } end |