Class: Fluent::Plugin::NngOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NngOutput
- Defined in:
- lib/fluent/plugin/out_nng.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #connect ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ NngOutput
constructor
A new instance of NngOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ NngOutput
Returns a new instance of NngOutput.
19 20 21 22 23 |
# File 'lib/fluent/plugin/out_nng.rb', line 19 def initialize super @formatter = nil log.info 'Initializing' end |
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_nng.rb', line 25 def configure(conf) log.info "configuring.." compat_parameters_convert(conf, :formatter, :inject) if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/ raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://' end super log.info "Creating formatter" @formatter = formatter_create log.info "Formatter loaded" end |
#connect ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_nng.rb', line 48 def connect @socket = NNG::Socket::Pair0.new try = 0 begin @socket.dial(@uri) rescue => e log.error(e) log.info("Retry in 5sec") sleep(@retry_time) try += 1 if @max_retry > 0 && try >= @max_retry raise Fluent::UnrecoverableError, e. end retry end end |
#format(tag, time, record) ⇒ Object
66 67 68 69 |
# File 'lib/fluent/plugin/out_nng.rb', line 66 def format(tag, time, record) injected_record = inject_values_to_record(tag, time, record) @formatter.format(tag, time, injected_record) end |
#shutdown ⇒ Object
71 72 73 74 75 |
# File 'lib/fluent/plugin/out_nng.rb', line 71 def shutdown log.info("Shutdown socket") @socket.close super() end |
#start ⇒ Object
38 39 40 41 42 |
# File 'lib/fluent/plugin/out_nng.rb', line 38 def start super log.info "Initiating connection to: #{@uri}" connect end |
#write(chunk) ⇒ Object
44 45 46 |
# File 'lib/fluent/plugin/out_nng.rb', line 44 def write(chunk) @socket.send(chunk.read) end |