Class: Fluent::Plugin::NngInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::NngInput
- Defined in:
- lib/fluent/plugin/in_nng.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ NngInput
constructor
A new instance of NngInput.
- #listen ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ NngInput
Returns a new instance of NngInput.
46 47 48 49 50 |
# File 'lib/fluent/plugin/in_nng.rb', line 46 def initialize super @socket = nil @stop = false end |
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/in_nng.rb', line 31 def configure(conf) if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/ raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://' end compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config raise Fluent::ConfigError, "<parse> section is required." end super log.info "parser: #{parser_config}" @parser = parser_create(conf: parser_config) end |
#listen ⇒ Object
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/in_nng.rb', line 52 def listen log.info "Starting listener at: #{@uri}" uri = URI(@uri) if uri.scheme == 'ipc' File.exist?(uri.path) && File.delete(uri.path) end @socket = NNG::Socket::Pair0.new @socket.listen(@uri) @socket.recv_timeout = @recv_timeout end |
#run ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/in_nng.rb', line 69 def run log.info "start looping.." loop do msg = nil until msg if @stop return end begin msg = @socket.receive rescue Timeout::Error sleep 0.01 next end end @parser.parse(msg) do |time, record| router.emit(@tag, time || Fluent::Engine.now, record) end end end |
#shutdown ⇒ Object
97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/in_nng.rb', line 97 def shutdown log.info "Initiate shutdown" if @socket log.info "stopping nng-socket" @socket.close end super end |
#start ⇒ Object
63 64 65 66 67 |
# File 'lib/fluent/plugin/in_nng.rb', line 63 def start super listen thread_create(:nng_input_run, &method(:run)) end |
#stop ⇒ Object
91 92 93 94 95 |
# File 'lib/fluent/plugin/in_nng.rb', line 91 def stop log.info "Stopping.." @stop = true super end |