Class: Fluent::Plugin::NngInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::NngInput
- Defined in:
- lib/fluent/plugin/in_nng_in.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ NngInput
constructor
A new instance of NngInput.
- #listen ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ NngInput
Returns a new instance of NngInput.
54 55 56 57 58 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 54 def initialize super @socket = nil @stop = false end |
Instance Method Details
#configure(conf) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 37 def configure(conf) compat_parameters_convert(conf, :parser) super if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/ raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://' end parser_config = conf.elements('parse').first unless parser_config raise Fluent::ConfigError, '<parse> section is required.' end log.info "parser: #{parser_config}" @parser = parser_create(conf: parser_config) end |
#listen ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 60 def listen log.info "Starting listener at: #{@uri}" uri = URI(@uri) if uri.scheme == 'ipc' File.exist?(uri.path) && File.delete(uri.path) end @socket = NNG::Socket::Pair0.new @socket.listen(@uri, cert: @cert, key: @key, ca: @ca, verify: @verify, server_name: @server_name) @socket.recv_timeout = @recv_timeout end |
#run ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 77 def run log.info 'Start listening..' loop do msg = nil until msg if @stop return end begin msg = @socket.receive rescue Timeout::Error sleep 0.01 next end end @parser.parse(msg) do |time, record| router.emit(@tag, time || Fluent::Engine.now, record) end end end |
#shutdown ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 105 def shutdown log.info 'Initiate shutdown' if @socket log.info 'stopping nng-socket' @socket.close end super end |
#start ⇒ Object
71 72 73 74 75 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 71 def start super listen thread_create(:nng_input_run, &method(:run)) end |
#stop ⇒ Object
99 100 101 102 103 |
# File 'lib/fluent/plugin/in_nng_in.rb', line 99 def stop log.info 'Stopping..' @stop = true super end |