Class: Fusuma::Runner
- Inherits:
-
Object
- Object
- Fusuma::Runner
- Defined in:
- lib/fusuma.rb
Overview
main class
Class Method Summary collapse
Instance Method Summary collapse
- #buffer(event) ⇒ Array<Plugin::Buffers::Buffer>, NilClass
- #clear_expired_events ⇒ Object
- #detect(buffers) ⇒ Array<Event>, NilClass
-
#execute(context, event) ⇒ NilClass
When event is NOT given or executable context is NOT found.
- #filter(event) ⇒ Plugin::Events::Event, NilClass
-
#initialize ⇒ Runner
constructor
A new instance of Runner.
- #initialize_plugins ⇒ Object
- #input ⇒ Plugin::Events::Event
- #merge(events) ⇒ Array<Hash, Plugin::Events::Event>, NilClass
- #parse(event) ⇒ Plugin::Events::Event, NilClass
- #pipeline ⇒ Object
- #run ⇒ Object
-
#run_with_lineprof(count: 1000) ⇒ Object
For performance monitoring.
- #set_trap ⇒ Object
Constructor Details
#initialize ⇒ Runner
Returns a new instance of Runner.
61 |
# File 'lib/fusuma.rb', line 61 def initialize; end |
Class Method Details
.run(option = {}) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/fusuma.rb', line 15 def run(option = {}) (option) instance = new instance.initialize_plugins instance.set_trap ## NOTE: Uncomment following line to measure performance # instance.run_with_lineprof instance.run ensure instance&.send(:shutdown) end |
Instance Method Details
#buffer(event) ⇒ Array<Plugin::Buffers::Buffer>, NilClass
123 124 125 |
# File 'lib/fusuma.rb', line 123 def buffer(event) @buffers.select { |b| b.buffer(event) } end |
#clear_expired_events ⇒ Object
192 193 194 |
# File 'lib/fusuma.rb', line 192 def clear_expired_events @buffers.each(&:clear_expired) end |
#detect(buffers) ⇒ Array<Event>, NilClass
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fusuma.rb', line 130 def detect(buffers) matched_detectors = @detectors.select do |detector| detector.watch? || buffers.any? { |b| detector.sources.include?(b.type) } end events = matched_detectors.each_with_object([]) do |detector, detected| # Array(detector.detect(@buffers)).each { |e| detected << e } detected.concat(Array(detector.detect(@buffers))) end return if events.empty? events end |
#execute(context, event) ⇒ NilClass
Returns when event is NOT given or executable context is NOT found.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fusuma.rb', line 177 def execute(context, event) return unless event # Find executable context Config::Searcher.with_context(context) do executor = @executors.find { |e| e.executable?(event) } if executor # Check interval and execute executor.enough_interval?(event) && executor.update_interval(event) && executor.execute(event) end end end |
#filter(event) ⇒ Plugin::Events::Event, NilClass
109 110 111 |
# File 'lib/fusuma.rb', line 109 def filter(event) event if @filters.any? { |f| f.filter(event) } end |
#initialize_plugins ⇒ Object
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fusuma.rb', line 63 def initialize_plugins @inputs = Plugin::Inputs::Input.plugins.map do |cls| cls.ancestors.include?(Singleton) ? cls.instance : cls.new end @filters = Plugin::Filters::Filter.plugins.map(&:new) @parsers = Plugin::Parsers::Parser.plugins.map(&:new) @buffers = Plugin::Buffers::Buffer.plugins.map(&:new) @detectors = Plugin::Detectors::Detector.plugins.map(&:new) @executors = Plugin::Executors::Executor.plugins.map(&:new) end |
#input ⇒ Plugin::Events::Event
102 103 104 |
# File 'lib/fusuma.rb', line 102 def input Plugin::Inputs::Input.select(@inputs) end |
#merge(events) ⇒ Array<Hash, Plugin::Events::Event>, NilClass
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/fusuma.rb', line 149 def merge(events) index_events, context_events = events.partition { |event| event.record.type == :index } main_events, modifiers = index_events.partition { |event| event.record.mergeable? } request_context = context_events.each_with_object({}) do |e, results| results[e.record.name] = e.record.value end main_events.sort_by! { |e| e.record.trigger_priority } matched_context = nil event = main_events.find do |main_event| matched_context = Config::Searcher.find_context(request_context) do if (modified_record = main_event.record.merge(records: modifiers.map(&:record))) main_event.record = modified_record elsif !modifiers.empty? # try basically the same, but without any modifiers # if modifiers is empty then we end up here only if there is no execute key for this Config.instance.search(main_event.record.index) && Config.instance.find_execute_key(main_event.record.index) end end end return if event.nil? [matched_context, event] end |
#parse(event) ⇒ Plugin::Events::Event, NilClass
116 117 118 |
# File 'lib/fusuma.rb', line 116 def parse(event) @parsers.reduce(event) { |e, p| p.parse(e) if e } end |
#pipeline ⇒ Object
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/fusuma.rb', line 78 def pipeline event = input || return clear_expired_events filtered = filter(event) || return parsed = parse(filtered) || return buffered = buffer(parsed) || return detected = detect(buffered) || return context, event = merge(detected) || return execute(context, event) end |
#run ⇒ Object
74 75 76 |
# File 'lib/fusuma.rb', line 74 def run loop { pipeline } end |
#run_with_lineprof(count: 1000) ⇒ Object
For performance monitoring
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fusuma.rb', line 90 def run_with_lineprof(count: 1000) require "rblineprof" require "rblineprof-report" profile = lineprof(%r{#{Pathname.new(__FILE__).parent}/.}) do count.times { pipeline } end LineProf.report(profile) exit 0 end |
#set_trap ⇒ Object
196 197 198 199 200 201 202 203 204 205 |
# File 'lib/fusuma.rb', line 196 def set_trap Signal.trap("INT") { shutdown exit } # Trap ^C Signal.trap("TERM") { shutdown exit } # Trap `Kill ` end |