Class: Fluent::Plugin::NngOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NngOutput
- Defined in:
- lib/fluent/plugin/out_nng_out.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #connect ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ NngOutput
constructor
A new instance of NngOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ NngOutput
Returns a new instance of NngOutput.
25 26 27 28 29 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 25 def initialize super @formatter = nil log.debug 'Initializing' end |
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 31 def configure(conf) log.debug 'configuring..' compat_parameters_convert(conf, :formatter, :inject) super if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/ raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://' end log.info 'Creating formatter' @formatter = formatter_create log.info 'Formatter loaded' end |
#connect ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 55 def connect @socket = NNG::Socket::Pair0.new try = 0 begin @socket.dial(@uri, cert: @cert, key: @key, ca: @ca, verify: @verify, server_name: @server_name) rescue => e log.error(e) log.info("Retry in #{@retry_time} sec") sleep(@retry_time) try += 1 if @max_retry > 0 && try >= @max_retry raise Fluent::UnrecoverableError, e. end retry end end |
#format(tag, time, record) ⇒ Object
74 75 76 77 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 74 def format(tag, time, record) injected_record = inject_values_to_record(tag, time, record) @formatter.format(tag, time, injected_record) end |
#shutdown ⇒ Object
79 80 81 82 83 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 79 def shutdown log.info('Shutdown socket') @socket.close super() end |
#start ⇒ Object
45 46 47 48 49 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 45 def start super log.info "Initiating connection to: #{@uri}" connect end |
#write(chunk) ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/out_nng_out.rb', line 51 def write(chunk) @socket.send(chunk.read) end |