Class: SemanticLogger::Appender::Kafka

Inherits:
Subscriber
  • Object
show all
Defined in:
lib/semantic_logger/appender/kafka.rb

Instance Attribute Summary collapse

Attributes inherited from Subscriber

#application, #environment, #formatter, #host, #logger, #metrics

Instance Method Summary collapse

Methods inherited from Subscriber

#batch_by_default?, #console_output?, #console_stream, #level, #should_log?

Constructor Details

#initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, required_acks: 1, metrics: true, **args) ⇒ Kafka

Send log messages to Kafka in JSON format.

Kafka Parameters:

seed_brokers: [Array<String>, String]
The list of brokers used to initialize the client. Either an Array of connections,
or a comma separated string of connections.
Connections can either be a string of "port:protocol" or a full URI with a scheme.
If there's a scheme it's ignored and only host/port are used.

client_id: [String]
The identifier for this application.
Default: semantic-logger

topic: [String]
Topic to publish log messages to.
Default: 'log_messages'

partition: [Integer]
The partition that the message should be written to.
Default: nil

partition_key: [String]
The key that should be used to assign a partition.
Default: nil

key: [String]
The message key.
Default: nil

connect_timeout: [Integer]
The timeout setting for connecting to brokers.
Default: nil

socket_timeout: [Integer]
The timeout setting for socket connections.
Default: nil

ssl_ca_cert: [String, Array<String>]
A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
Default: nil

ssl_client_cert: [String]
A PEM encoded client cert to use with a SSL connection.
Must be used in combination with ssl_client_cert_key.
Default: nil

ssl_client_cert_key [String]
A PEM encoded client cert key to use with a SSL connection.
Must be used in combination with ssl_client_cert.
Default: nil

ssl_ca_certs_from_system: [boolean]
Delegate SSL CA cert to the system certs

delivery_threshold: [Integer]
 Number of messages between triggering a delivery of messages to Apache Kafka.
 Default: 100

delivery_interval: [Integer]
 Number of seconds between triggering a delivery of messages to Apache Kafka.
 Default: 5

required_acks: [Integer]
 Number of replicas that must acknowledge receipt of each log message to the topic
 Default: 1

Semantic Logger Parameters:

level: [:trace | :debug | :info | :warn | :error | :fatal]
Override the log level for this appender.
Default: SemanticLogger.default_level

formatter: [Object|Proc|Symbol|Hash]
An instance of a class that implements #call, or a Proc to be used to format
the output from this appender
Default: :raw_json (See: #call)

filter: [Regexp|Proc]
RegExp: Only include log messages where the class name matches the supplied.
regular expression. All other messages will be ignored.
Proc: Only include log messages where the supplied Proc returns true
      The Proc must return true or false.

host: [String]
Name of this host to appear in log messages.
Default: SemanticLogger.host

application: [String]
Name of this application to appear in log messages.
Default: SemanticLogger.application

metrics: [Boolean]
Send metrics only events to kafka.
Default: true


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/semantic_logger/appender/kafka.rb', line 126

def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false,
               topic: "log_messages", partition: nil, partition_key: nil, key: nil,
               delivery_threshold: 100, delivery_interval: 10, required_acks: 1,
               metrics: true, **args, &)
  @seed_brokers             = seed_brokers
  @client_id                = client_id
  @connect_timeout          = connect_timeout
  @socket_timeout           = socket_timeout
  @ssl_ca_cert              = ssl_ca_cert
  @ssl_client_cert          = ssl_client_cert
  @ssl_client_cert_key      = ssl_client_cert_key
  @ssl_ca_certs_from_system = ssl_ca_certs_from_system
  @topic                    = topic
  @partition                = partition
  @partition_key            = partition_key
  @key                      = key
  @delivery_threshold       = delivery_threshold
  @delivery_interval        = delivery_interval
  @required_acks            = required_acks

  super(metrics: metrics, **args, &)
  reopen
end

Instance Attribute Details

#client_idObject

Returns the value of attribute client_id.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def client_id
  @client_id
end

#connect_timeoutObject

Returns the value of attribute connect_timeout.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def connect_timeout
  @connect_timeout
end

#delivery_intervalObject

Returns the value of attribute delivery_interval.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def delivery_interval
  @delivery_interval
end

#delivery_thresholdObject

Returns the value of attribute delivery_threshold.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def delivery_threshold
  @delivery_threshold
end

#keyObject

Returns the value of attribute key.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def key
  @key
end

#partitionObject

Returns the value of attribute partition.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def partition
  @partition
end

#partition_keyObject

Returns the value of attribute partition_key.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def partition_key
  @partition_key
end

#required_acksObject

Returns the value of attribute required_acks.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def required_acks
  @required_acks
end

#seed_brokersObject

Returns the value of attribute seed_brokers.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def seed_brokers
  @seed_brokers
end

#socket_timeoutObject

Returns the value of attribute socket_timeout.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def socket_timeout
  @socket_timeout
end

#ssl_ca_certObject

Returns the value of attribute ssl_ca_cert.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def ssl_ca_cert
  @ssl_ca_cert
end

#ssl_ca_certs_from_systemObject

Returns the value of attribute ssl_ca_certs_from_system.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def ssl_ca_certs_from_system
  @ssl_ca_certs_from_system
end

#ssl_client_certObject

Returns the value of attribute ssl_client_cert.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def ssl_client_cert
  @ssl_client_cert
end

#ssl_client_cert_keyObject

Returns the value of attribute ssl_client_cert_key.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def ssl_client_cert_key
  @ssl_client_cert_key
end

#topicObject

Returns the value of attribute topic.



26
27
28
# File 'lib/semantic_logger/appender/kafka.rb', line 26

def topic
  @topic
end

Instance Method Details

#closeObject



171
172
173
174
175
176
# File 'lib/semantic_logger/appender/kafka.rb', line 171

def close
  @producer&.shutdown
  @producer = nil
  @kafka&.close
  @kafka = nil
end

#default_formatterObject

Use JSON Formatter by default.



185
186
187
# File 'lib/semantic_logger/appender/kafka.rb', line 185

def default_formatter
  SemanticLogger::Formatters::Json.new
end

#flushObject

Restart producer thread since there is no other way to flush.



190
191
192
193
194
195
196
# File 'lib/semantic_logger/appender/kafka.rb', line 190

def flush
  @producer.shutdown
  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval
  )
end

#log(log) ⇒ Object

Forward log messages to Kafka producer thread.



179
180
181
182
# File 'lib/semantic_logger/appender/kafka.rb', line 179

def log(log)
  json = formatter.call(log, self)
  @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
end

#reopenObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/semantic_logger/appender/kafka.rb', line 151

def reopen
  @kafka = ::Kafka.new(
    seed_brokers:             seed_brokers,
    client_id:                client_id,
    connect_timeout:          connect_timeout,
    socket_timeout:           socket_timeout,
    ssl_ca_cert:              ssl_ca_cert,
    ssl_client_cert:          ssl_client_cert,
    ssl_client_cert_key:      ssl_client_cert_key,
    ssl_ca_certs_from_system: ssl_ca_certs_from_system,
    logger:                   logger
  )

  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval,
    required_acks:      required_acks
  )
end