Class: SemanticLogger::QueueProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/semantic_logger/queue_processor.rb

Overview

Internal class that processes log messages from a queue on a separate thread.

Internal use only: it owns the worker thread and the in-memory queue that back the asynchronous appender proxy (SemanticLogger::Appender::Async). It is never returned to application code.

Supports two processing modes, selected by the batch: option:

* Streaming (batch: false): each log message is written to the appender as it is dequeued.
* Batching  (batch: true):  log messages are grouped and written via the appender's #batch
                          method, either once batch_size messages have accumulated, or
                          batch_seconds have elapsed since the previous batch.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, batch: false, batch_size: 300, batch_seconds: 5, non_blocking: false, dropped_message_report_seconds: 30, async_max_retries: 100) ⇒ QueueProcessor

Parameters:

appender: [SemanticLogger::Subscriber]
The appender to forward log messages to from the worker thread.

max_queue_size: [Integer]
The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
-1: The queue size is uncapped and will never block no matter how long the queue is.
Default: 10,000

lag_threshold_s: [Float]
Log a warning when a log message has been on the queue for longer than this period in seconds.
Default: 30

lag_check_interval: [Integer]
Number of messages to process before checking for slow logging.
Default: 1,000
Note: Not applicable when batch: true.

batch: [true|false]
Process log messages in batches via the appender's #batch method.
Default: false
Note: The appender must implement #batch.

batch_size: [Integer]
Maximum number of messages to batch up before sending.
Default: 300
Note: Only applicable when batch: true.

batch_seconds: [Integer]
Maximum number of seconds between sending batches.
Default: 5
Note: Only applicable when batch: true.

non_blocking: [true|false]
Whether to drop log messages instead of blocking the calling thread when the queue is full.
Only applies to a capped queue.
Default: false

dropped_message_report_seconds: [Integer]
When non_blocking is enabled, log the count of dropped messages to the internal logger
at most once every this number of seconds.
Default: 30

async_max_retries: [Integer]
Maximum number of consecutive times to restart the worker thread after it raises an
exception while processing messages. Each restart first sleeps for `retry_count` seconds
(1s, then 2s, ...) as a back-off. Once this many consecutive retries are exhausted the
thread stops instead of restarting. The counter resets to zero whenever a message is
processed successfully.
-1: Retry indefinitely and never stop the thread (the pre-v5 behaviour). The back-off
    still applies, and still resets after any successful message.
Default: 100

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/semantic_logger/queue_processor.rb', line 78

def initialize(appender:,
               max_queue_size: 10_000,
               lag_check_interval: 1_000,
               lag_threshold_s: 30,
               batch: false,
               batch_size: 300,
               batch_seconds: 5,
               non_blocking: false,
               dropped_message_report_seconds: 30,
               async_max_retries: 100)
  @appender                       = appender
  @max_queue_size                 = max_queue_size
  @lag_check_interval             = lag_check_interval
  @lag_threshold_s                = lag_threshold_s
  @batch                          = batch
  @batch_size                     = batch_size
  @batch_seconds                  = batch_seconds
  @non_blocking                   = non_blocking
  @dropped_message_report_seconds = dropped_message_report_seconds
  @async_max_retries              = async_max_retries
  @retry_count                    = 0
  @thread                         = nil
  # Only batch mode parks the worker on the signal; streaming mode never touches it.
  @signal                         = Concurrent::Event.new if batch
  @dropped_message_count          = 0
  @dropped_message_reported_at    = Time.now
  @dropped_message_mutex          = Mutex.new
  @processed_count                = 0
  @dropped_count                  = 0
  create_queue

  return unless batch? && !appender.respond_to?(:batch)

  raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch")
end

Instance Attribute Details

#appenderObject (readonly)

Returns the value of attribute appender.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def appender
  @appender
end

#async_max_retriesObject (readonly)

Returns the value of attribute async_max_retries.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def async_max_retries
  @async_max_retries
end

#batch_secondsObject

Returns the value of attribute batch_seconds.



14
15
16
# File 'lib/semantic_logger/queue_processor.rb', line 14

def batch_seconds
  @batch_seconds
end

#batch_sizeObject

Returns the value of attribute batch_size.



14
15
16
# File 'lib/semantic_logger/queue_processor.rb', line 14

def batch_size
  @batch_size
end

#dropped_countObject (readonly)

Returns the value of attribute dropped_count.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def dropped_count
  @dropped_count
end

#dropped_message_report_secondsObject

Returns the value of attribute dropped_message_report_seconds.



14
15
16
# File 'lib/semantic_logger/queue_processor.rb', line 14

def dropped_message_report_seconds
  @dropped_message_report_seconds
end

#lag_check_intervalObject

Returns the value of attribute lag_check_interval.



14
15
16
# File 'lib/semantic_logger/queue_processor.rb', line 14

def lag_check_interval
  @lag_check_interval
end

#lag_threshold_sObject

Returns the value of attribute lag_threshold_s.



14
15
16
# File 'lib/semantic_logger/queue_processor.rb', line 14

def lag_threshold_s
  @lag_threshold_s
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def max_queue_size
  @max_queue_size
end

#non_blockingObject (readonly)

Returns the value of attribute non_blocking.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def non_blocking
  @non_blocking
end

#processed_countObject (readonly)

Returns the value of attribute processed_count.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def processed_count
  @processed_count
end

#queueObject (readonly)

Returns the value of attribute queue.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def queue
  @queue
end

#retry_countObject (readonly)

Returns the value of attribute retry_count.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def retry_count
  @retry_count
end

#signalObject (readonly)

Returns the value of attribute signal.



16
17
18
# File 'lib/semantic_logger/queue_processor.rb', line 16

def signal
  @signal
end

Class Method Details

.start(**args) ⇒ Object

Create a new processor and start its worker thread.



20
21
22
23
24
# File 'lib/semantic_logger/queue_processor.rb', line 20

def self.start(**args)
  processor = new(**args)
  processor.thread
  processor
end

Instance Method Details

#active?Boolean

Returns [true|false] whether the worker thread is running.

Returns:

  • (Boolean)


129
130
131
# File 'lib/semantic_logger/queue_processor.rb', line 129

def active?
  @thread&.alive?
end

#batch?Boolean

Returns [true|false] whether messages are processed in batches.

Returns:

  • (Boolean)


145
146
147
# File 'lib/semantic_logger/queue_processor.rb', line 145

def batch?
  @batch
end

#capped?Boolean

Returns [true|false] whether the queue has a capped size.

Returns:

  • (Boolean)


134
135
136
# File 'lib/semantic_logger/queue_processor.rb', line 134

def capped?
  @capped
end

#closeObject

Flush any outstanding messages and close the appender.



181
182
183
# File 'lib/semantic_logger/queue_processor.rb', line 181

def close
  submit_request(:close)
end

#flushObject

Flush all queued log entries to the appender. All queued log messages are written and then the appender is flushed.



176
177
178
# File 'lib/semantic_logger/queue_processor.rb', line 176

def flush
  submit_request(:flush)
end

#log(log) ⇒ Object

Add a log message to the queue for processing.

When non-blocking and the queue is full, the message is dropped instead of blocking the calling thread, and the count of dropped messages is reported periodically.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/semantic_logger/queue_processor.rb', line 153

def log(log)
  enqueued = true
  if non_blocking?
    begin
      queue.push(log, true)
    rescue ThreadError
      message_dropped
      enqueued = false
    end
  else
    queue << log
  end

  # For batches wake up the processing thread once the number of queued messages has been
  # exceeded. Also wake it on a drop so it drains the full queue rather than waiting out the
  # batch interval.
  signal.set if batch? && (queue.size >= batch_size)

  enqueued
end

#loggerObject

Internal logger used to report problems encountered while processing log messages.



115
116
117
# File 'lib/semantic_logger/queue_processor.rb', line 115

def logger
  appender.logger
end

#non_blocking?Boolean

Returns [true|false] whether messages are dropped instead of blocking when the queue is full. Only a capped queue can drop messages.

Returns:

  • (Boolean)


140
141
142
# File 'lib/semantic_logger/queue_processor.rb', line 140

def non_blocking?
  @non_blocking && capped?
end

#reopenObject

Re-open the queue and worker thread after a fork.



186
187
188
189
190
191
192
193
194
# File 'lib/semantic_logger/queue_processor.rb', line 186

def reopen
  # Workaround CRuby crash on fork by recreating queue on reopen
  #   https://github.com/reidmorrison/semantic_logger/issues/103
  @queue&.close
  create_queue

  @thread&.kill if @thread&.alive?
  @thread = spawn_worker
end

#threadObject

Returns [Thread] the worker thread.

Starts the worker thread if it is not currently running.



122
123
124
125
126
# File 'lib/semantic_logger/queue_processor.rb', line 122

def thread
  return @thread if @thread&.alive?

  @thread = spawn_worker
end