Class: Fluent::Plugin::NngInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_nng_in.rb

Instance Method Summary collapse

Constructor Details

#initializeNngInput

Returns a new instance of NngInput.



54
55
56
57
58
# File 'lib/fluent/plugin/in_nng_in.rb', line 54

def initialize
  super
  @socket = nil
  @stop = false
end

Instance Method Details

#configure(conf) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/in_nng_in.rb', line 37

def configure(conf)
  compat_parameters_convert(conf, :parser)
  super

  if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/
    raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://'
  end

  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, '<parse> section is required.'
  end

  log.info "parser: #{parser_config}"
  @parser = parser_create(conf: parser_config)
end

#listenObject



60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/in_nng_in.rb', line 60

def listen
  log.info "Starting listener at: #{@uri}"
  uri = URI(@uri)
  if uri.scheme == 'ipc'
    File.exist?(uri.path) && File.delete(uri.path)
  end
  @socket = NNG::Socket::Pair0.new
  @socket.listen(@uri, cert: @cert, key: @key, ca: @ca, verify: @verify, server_name: @server_name)
  @socket.recv_timeout = @recv_timeout
end

#runObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_nng_in.rb', line 77

def run
  log.info 'Start listening..'
  loop do
    msg = nil
    until msg
      if @stop
        return
      end

      begin
        msg = @socket.receive
      rescue Timeout::Error
        sleep 0.01
        next
      end
    end
    @parser.parse(msg) do |time, record|
      router.emit(@tag, time || Fluent::Engine.now, record)
    end
  end
end

#shutdownObject



105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/in_nng_in.rb', line 105

def shutdown
  log.info 'Initiate shutdown'
  if @socket
    log.info 'stopping nng-socket'
    @socket.close
  end
  super
end

#startObject



71
72
73
74
75
# File 'lib/fluent/plugin/in_nng_in.rb', line 71

def start
  super
  listen
  thread_create(:nng_input_run, &method(:run))
end

#stopObject



99
100
101
102
103
# File 'lib/fluent/plugin/in_nng_in.rb', line 99

def stop
  log.info 'Stopping..'
  @stop = true
  super
end