Class: Fluent::Plugin::NngInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_nng.rb

Instance Method Summary collapse

Constructor Details

#initializeNngInput

Returns a new instance of NngInput.



46
47
48
49
50
# File 'lib/fluent/plugin/in_nng.rb', line 46

def initialize
  super
  @socket = nil
  @stop = false
end

Instance Method Details

#configure(conf) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/in_nng.rb', line 31

def configure(conf)
  if @uri !~ /\A#{URI::RFC2396_PARSER.make_regexp(['tcp', 'ipc', 'inproc', 'ws', 'tls+tcp'])}\z/
    raise Fluent::ConfigError, 'uri must be one of: tcp:// ipc:// inproc:// ws:// or tls+tcp://'
  end

  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, "<parse> section is required."
  end
  super
  log.info "parser: #{parser_config}"
  @parser = parser_create(conf: parser_config)
end

#listenObject



52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/in_nng.rb', line 52

def listen
  log.info "Starting listener at: #{@uri}"
  uri = URI(@uri)
  if uri.scheme == 'ipc'
    File.exist?(uri.path) && File.delete(uri.path)
  end
  @socket = NNG::Socket::Pair0.new
  @socket.listen(@uri)
  @socket.recv_timeout = @recv_timeout
end

#runObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_nng.rb', line 69

def run
  log.info "start looping.."
  loop do
    msg = nil
    until msg
      if @stop
        return
      end
      begin
        msg = @socket.receive
      rescue Timeout::Error
        sleep 0.01
        next
      end
    end
    @parser.parse(msg) do |time, record|
      router.emit(@tag, time || Fluent::Engine.now, record)
    end
  end

end

#shutdownObject



97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/in_nng.rb', line 97

def shutdown
  log.info "Initiate shutdown"
  if @socket
    log.info "stopping nng-socket"
    @socket.close
  end
  super
end

#startObject



63
64
65
66
67
# File 'lib/fluent/plugin/in_nng.rb', line 63

def start
  super
  listen
  thread_create(:nng_input_run, &method(:run))
end

#stopObject



91
92
93
94
95
# File 'lib/fluent/plugin/in_nng.rb', line 91

def stop
  log.info "Stopping.."
  @stop = true
  super
end