Class: Fluent::EventRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/event_router.rb

Overview

EventRouter is responsible to route events to a collector.

It has a list of MatchPattern and Collector pairs:

+----------------+     +-----------------+
|  MatchPattern  |     |    Collector    |
+----------------+     +-----------------+
|   access.**  ---------> type forward   |
|     logs.**  ---------> type copy      |
|  archive.**  ---------> type s3        |
+----------------+     +-----------------+

EventRouter does:

1) receive an event at ‘#emit` methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector

Collector is either of Output, Filter or other EventRouter.

Defined Under Namespace

Classes: MatchCache, Pipeline, Rule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(default_collector, emit_error_handler) ⇒ EventRouter

Returns a new instance of EventRouter.

[View source]

45
46
47
48
49
50
51
52
# File 'lib/fluent/event_router.rb', line 45

def initialize(default_collector, emit_error_handler)
  @match_rules = []
  @match_cache = MatchCache.new
  @default_collector = default_collector
  @emit_error_handler = emit_error_handler
  @metric_callbacks = {}
  @caller_plugin_id = nil
end

Instance Attribute Details

#default_collectorObject

Returns the value of attribute default_collector.


54
55
56
# File 'lib/fluent/event_router.rb', line 54

def default_collector
  @default_collector
end

#emit_error_handlerObject

Returns the value of attribute emit_error_handler.


55
56
57
# File 'lib/fluent/event_router.rb', line 55

def emit_error_handler
  @emit_error_handler
end

Instance Method Details

#add_metric_callbacks(caller_plugin_id, callback) ⇒ Object

[View source]

88
89
90
# File 'lib/fluent/event_router.rb', line 88

def add_metric_callbacks(caller_plugin_id, callback)
  @metric_callbacks[caller_plugin_id] = callback
end

#add_rule(pattern, collector) ⇒ Object

called by Agent to add new match pattern and collector

[View source]

84
85
86
# File 'lib/fluent/event_router.rb', line 84

def add_rule(pattern, collector)
  @match_rules << Rule.new(pattern, collector)
end

#caller_plugin_id=(caller_plugin_id) ⇒ Object

[View source]

92
93
94
# File 'lib/fluent/event_router.rb', line 92

def caller_plugin_id=(caller_plugin_id)
  @caller_plugin_id = caller_plugin_id
end

#emit(tag, time, record) ⇒ Object

[View source]

104
105
106
107
108
# File 'lib/fluent/event_router.rb', line 104

def emit(tag, time, record)
  unless record.nil?
    emit_stream(tag, OneEventStream.new(time, record))
  end
end

#emit_array(tag, array) ⇒ Object

[View source]

110
111
112
# File 'lib/fluent/event_router.rb', line 110

def emit_array(tag, array)
  emit_stream(tag, ArrayEventStream.new(array))
end

#emit_error_event(tag, time, record, error) ⇒ Object

[View source]

125
126
127
# File 'lib/fluent/event_router.rb', line 125

def emit_error_event(tag, time, record, error)
  @emit_error_handler.emit_error_event(tag, time, record, error)
end

#emit_stream(tag, es) ⇒ Object

[View source]

114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/event_router.rb', line 114

def emit_stream(tag, es)
  match(tag).emit_events(tag, es)
  if callback = find_callback
    callback.call(es)
  end
rescue Pipeline::OutputError => e
  @emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

#find_callbackObject

[View source]

96
97
98
99
100
101
102
# File 'lib/fluent/event_router.rb', line 96

def find_callback
  if @caller_plugin_id
    @metric_callbacks[@caller_plugin_id]
  else
    nil
  end
end

#match(tag) ⇒ Object

[View source]

133
134
135
136
137
138
# File 'lib/fluent/event_router.rb', line 133

def match(tag)
  collector = @match_cache.get(tag) {
    find(tag) || @default_collector
  }
  collector
end

#match?(tag) ⇒ Boolean

Returns:

  • (Boolean)
[View source]

129
130
131
# File 'lib/fluent/event_router.rb', line 129

def match?(tag)
  !!find(tag)
end

#suppress_missing_match!Object

[View source]

77
78
79
80
81
# File 'lib/fluent/event_router.rb', line 77

def suppress_missing_match!
  if @default_collector.respond_to?(:suppress_missing_match!)
    @default_collector.suppress_missing_match!
  end
end