Class: Fluent::EventRouter
- Inherits:
-
Object
- Object
- Fluent::EventRouter
- Defined in:
- lib/fluent/event_router.rb
Overview
EventRouter is responsible to route events to a collector.
It has a list of MatchPattern and Collector pairs:
+----------------+ +-----------------+
| MatchPattern | | Collector |
+----------------+ +-----------------+
| access.** ---------> type forward |
| logs.** ---------> type copy |
| archive.** ---------> type s3 |
+----------------+ +-----------------+
EventRouter does:
1) receive an event at ‘#emit` methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector
Collector is either of Output, Filter or other EventRouter.
Defined Under Namespace
Classes: MatchCache, Pipeline, Rule
Instance Attribute Summary collapse
-
#default_collector ⇒ Object
Returns the value of attribute default_collector.
-
#emit_error_handler ⇒ Object
Returns the value of attribute emit_error_handler.
Instance Method Summary collapse
- #add_metric_callbacks(caller_plugin_id, callback) ⇒ Object
-
#add_rule(pattern, collector) ⇒ Object
called by Agent to add new match pattern and collector.
- #caller_plugin_id=(caller_plugin_id) ⇒ Object
- #emit(tag, time, record) ⇒ Object
- #emit_array(tag, array) ⇒ Object
- #emit_error_event(tag, time, record, error) ⇒ Object
- #emit_stream(tag, es) ⇒ Object
- #find_callback ⇒ Object
-
#initialize(default_collector, emit_error_handler) ⇒ EventRouter
constructor
A new instance of EventRouter.
- #match(tag) ⇒ Object
- #match?(tag) ⇒ Boolean
- #suppress_missing_match! ⇒ Object
Constructor Details
#initialize(default_collector, emit_error_handler) ⇒ EventRouter
Returns a new instance of EventRouter.
45 46 47 48 49 50 51 52 |
# File 'lib/fluent/event_router.rb', line 45 def initialize(default_collector, emit_error_handler) @match_rules = [] @match_cache = MatchCache.new @default_collector = default_collector @emit_error_handler = emit_error_handler @metric_callbacks = {} @caller_plugin_id = nil end |
Instance Attribute Details
#default_collector ⇒ Object
Returns the value of attribute default_collector.
54 55 56 |
# File 'lib/fluent/event_router.rb', line 54 def default_collector @default_collector end |
#emit_error_handler ⇒ Object
Returns the value of attribute emit_error_handler.
55 56 57 |
# File 'lib/fluent/event_router.rb', line 55 def emit_error_handler @emit_error_handler end |
Instance Method Details
#add_metric_callbacks(caller_plugin_id, callback) ⇒ Object
[View source]
88 89 90 |
# File 'lib/fluent/event_router.rb', line 88 def add_metric_callbacks(caller_plugin_id, callback) @metric_callbacks[caller_plugin_id] = callback end |
#add_rule(pattern, collector) ⇒ Object
called by Agent to add new match pattern and collector
84 85 86 |
# File 'lib/fluent/event_router.rb', line 84 def add_rule(pattern, collector) @match_rules << Rule.new(pattern, collector) end |
#caller_plugin_id=(caller_plugin_id) ⇒ Object
[View source]
92 93 94 |
# File 'lib/fluent/event_router.rb', line 92 def caller_plugin_id=(caller_plugin_id) @caller_plugin_id = caller_plugin_id end |
#emit(tag, time, record) ⇒ Object
[View source]
104 105 106 107 108 |
# File 'lib/fluent/event_router.rb', line 104 def emit(tag, time, record) unless record.nil? emit_stream(tag, OneEventStream.new(time, record)) end end |
#emit_array(tag, array) ⇒ Object
[View source]
110 111 112 |
# File 'lib/fluent/event_router.rb', line 110 def emit_array(tag, array) emit_stream(tag, ArrayEventStream.new(array)) end |
#emit_error_event(tag, time, record, error) ⇒ Object
[View source]
125 126 127 |
# File 'lib/fluent/event_router.rb', line 125 def emit_error_event(tag, time, record, error) @emit_error_handler.emit_error_event(tag, time, record, error) end |
#emit_stream(tag, es) ⇒ Object
[View source]
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/fluent/event_router.rb', line 114 def emit_stream(tag, es) match(tag).emit_events(tag, es) if callback = find_callback callback.call(es) end rescue Pipeline::OutputError => e @emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error) rescue => e @emit_error_handler.handle_emits_error(tag, es, e) end |
#find_callback ⇒ Object
[View source]
96 97 98 99 100 101 102 |
# File 'lib/fluent/event_router.rb', line 96 def find_callback if @caller_plugin_id @metric_callbacks[@caller_plugin_id] else nil end end |
#match(tag) ⇒ Object
[View source]
133 134 135 136 137 138 |
# File 'lib/fluent/event_router.rb', line 133 def match(tag) collector = @match_cache.get(tag) { find(tag) || @default_collector } collector end |
#match?(tag) ⇒ Boolean
129 130 131 |
# File 'lib/fluent/event_router.rb', line 129 def match?(tag) !!find(tag) end |
#suppress_missing_match! ⇒ Object
[View source]
77 78 79 80 81 |
# File 'lib/fluent/event_router.rb', line 77 def suppress_missing_match! if @default_collector.respond_to?(:suppress_missing_match!) @default_collector.suppress_missing_match! end end |