Class: SemanticLogger::QueueProcessor
- Inherits:
-
Object
- Object
- SemanticLogger::QueueProcessor
- Defined in:
- lib/semantic_logger/queue_processor.rb
Overview
Internal class that processes log messages from a queue on a separate thread.
Internal use only: it owns the worker thread and the in-memory queue that back the asynchronous appender proxy (SemanticLogger::Appender::Async). It is never returned to application code.
Supports two processing modes, selected by the batch: option:
* Streaming (batch: false): each log message is written to the appender as it is dequeued.
* Batching (batch: true): log messages are grouped and written via the appender's #batch
method, either once batch_size messages have accumulated, or
batch_seconds have elapsed since the previous batch.
Instance Attribute Summary collapse
-
#appender ⇒ Object
readonly
Returns the value of attribute appender.
-
#async_max_retries ⇒ Object
readonly
Returns the value of attribute async_max_retries.
-
#batch_seconds ⇒ Object
Returns the value of attribute batch_seconds.
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#dropped_count ⇒ Object
readonly
Returns the value of attribute dropped_count.
-
#dropped_message_report_seconds ⇒ Object
Returns the value of attribute dropped_message_report_seconds.
-
#lag_check_interval ⇒ Object
Returns the value of attribute lag_check_interval.
-
#lag_threshold_s ⇒ Object
Returns the value of attribute lag_threshold_s.
-
#max_queue_size ⇒ Object
readonly
Returns the value of attribute max_queue_size.
-
#non_blocking ⇒ Object
readonly
Returns the value of attribute non_blocking.
-
#processed_count ⇒ Object
readonly
Returns the value of attribute processed_count.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#retry_count ⇒ Object
readonly
Returns the value of attribute retry_count.
-
#signal ⇒ Object
readonly
Returns the value of attribute signal.
Class Method Summary collapse
-
.start(**args) ⇒ Object
Create a new processor and start its worker thread.
Instance Method Summary collapse
-
#active? ⇒ Boolean
Returns [true|false] whether the worker thread is running.
-
#batch? ⇒ Boolean
Returns [true|false] whether messages are processed in batches.
-
#capped? ⇒ Boolean
Returns [true|false] whether the queue has a capped size.
-
#close ⇒ Object
Flush any outstanding messages and close the appender.
-
#flush ⇒ Object
Flush all queued log entries to the appender.
-
#initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, batch: false, batch_size: 300, batch_seconds: 5, non_blocking: false, dropped_message_report_seconds: 30, async_max_retries: 100) ⇒ QueueProcessor
constructor
Parameters: appender: [SemanticLogger::Subscriber] The appender to forward log messages to from the worker thread.
-
#log(log) ⇒ Object
Add a log message to the queue for processing.
-
#logger ⇒ Object
Internal logger used to report problems encountered while processing log messages.
-
#non_blocking? ⇒ Boolean
Returns [true|false] whether messages are dropped instead of blocking when the queue is full.
-
#reopen ⇒ Object
Re-open the queue and worker thread after a fork.
-
#thread ⇒ Object
Returns [Thread] the worker thread.
Constructor Details
#initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, batch: false, batch_size: 300, batch_seconds: 5, non_blocking: false, dropped_message_report_seconds: 30, async_max_retries: 100) ⇒ QueueProcessor
Parameters:
appender: [SemanticLogger::Subscriber]
The appender to forward log messages to from the worker thread.
max_queue_size: [Integer]
The maximum number of log messages to hold on the queue before blocking attempts to add to the queue.
-1: The queue size is uncapped and will never block no matter how long the queue is.
Default: 10,000
lag_threshold_s: [Float]
Log a warning when a log message has been on the queue for longer than this period in seconds.
Default: 30
lag_check_interval: [Integer]
Number of messages to process before checking for slow logging.
Default: 1,000
Note: Not applicable when batch: true.
batch: [true|false]
Process log messages in batches via the appender's #batch method.
Default: false
Note: The appender must implement #batch.
batch_size: [Integer]
Maximum number of messages to batch up before sending.
Default: 300
Note: Only applicable when batch: true.
batch_seconds: [Integer]
Maximum number of seconds between sending batches.
Default: 5
Note: Only applicable when batch: true.
non_blocking: [true|false]
Whether to drop log messages instead of blocking the calling thread when the queue is full.
Only applies to a capped queue.
Default: false
dropped_message_report_seconds: [Integer]
When non_blocking is enabled, log the count of dropped messages to the internal logger
at most once every this number of seconds.
Default: 30
async_max_retries: [Integer]
Maximum number of consecutive times to restart the worker thread after it raises an
exception while processing messages. Each restart first sleeps for `retry_count` seconds
(1s, then 2s, ...) as a back-off. Once this many consecutive retries are exhausted the
thread stops instead of restarting. The counter resets to zero whenever a message is
processed successfully.
-1: Retry indefinitely and never stop the thread (the pre-v5 behaviour). The back-off
still applies, and still resets after any successful message.
Default: 100
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/semantic_logger/queue_processor.rb', line 78 def initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30, batch: false, batch_size: 300, batch_seconds: 5, non_blocking: false, dropped_message_report_seconds: 30, async_max_retries: 100) @appender = appender @max_queue_size = max_queue_size @lag_check_interval = lag_check_interval @lag_threshold_s = lag_threshold_s @batch = batch @batch_size = batch_size @batch_seconds = batch_seconds @non_blocking = non_blocking @dropped_message_report_seconds = @async_max_retries = async_max_retries @retry_count = 0 @thread = nil # Only batch mode parks the worker on the signal; streaming mode never touches it. @signal = Concurrent::Event.new if batch @dropped_message_count = 0 @dropped_message_reported_at = Time.now @dropped_message_mutex = Mutex.new @processed_count = 0 @dropped_count = 0 create_queue return unless batch? && !appender.respond_to?(:batch) raise(ArgumentError, "#{appender.class.name} does not support batching. It must implement #batch") end |
Instance Attribute Details
#appender ⇒ Object (readonly)
Returns the value of attribute appender.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def appender @appender end |
#async_max_retries ⇒ Object (readonly)
Returns the value of attribute async_max_retries.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def async_max_retries @async_max_retries end |
#batch_seconds ⇒ Object
Returns the value of attribute batch_seconds.
14 15 16 |
# File 'lib/semantic_logger/queue_processor.rb', line 14 def batch_seconds @batch_seconds end |
#batch_size ⇒ Object
Returns the value of attribute batch_size.
14 15 16 |
# File 'lib/semantic_logger/queue_processor.rb', line 14 def batch_size @batch_size end |
#dropped_count ⇒ Object (readonly)
Returns the value of attribute dropped_count.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def dropped_count @dropped_count end |
#dropped_message_report_seconds ⇒ Object
Returns the value of attribute dropped_message_report_seconds.
14 15 16 |
# File 'lib/semantic_logger/queue_processor.rb', line 14 def @dropped_message_report_seconds end |
#lag_check_interval ⇒ Object
Returns the value of attribute lag_check_interval.
14 15 16 |
# File 'lib/semantic_logger/queue_processor.rb', line 14 def lag_check_interval @lag_check_interval end |
#lag_threshold_s ⇒ Object
Returns the value of attribute lag_threshold_s.
14 15 16 |
# File 'lib/semantic_logger/queue_processor.rb', line 14 def lag_threshold_s @lag_threshold_s end |
#max_queue_size ⇒ Object (readonly)
Returns the value of attribute max_queue_size.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def max_queue_size @max_queue_size end |
#non_blocking ⇒ Object (readonly)
Returns the value of attribute non_blocking.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def non_blocking @non_blocking end |
#processed_count ⇒ Object (readonly)
Returns the value of attribute processed_count.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def processed_count @processed_count end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def queue @queue end |
#retry_count ⇒ Object (readonly)
Returns the value of attribute retry_count.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def retry_count @retry_count end |
#signal ⇒ Object (readonly)
Returns the value of attribute signal.
16 17 18 |
# File 'lib/semantic_logger/queue_processor.rb', line 16 def signal @signal end |
Class Method Details
.start(**args) ⇒ Object
Create a new processor and start its worker thread.
20 21 22 23 24 |
# File 'lib/semantic_logger/queue_processor.rb', line 20 def self.start(**args) processor = new(**args) processor.thread processor end |
Instance Method Details
#active? ⇒ Boolean
Returns [true|false] whether the worker thread is running.
129 130 131 |
# File 'lib/semantic_logger/queue_processor.rb', line 129 def active? @thread&.alive? end |
#batch? ⇒ Boolean
Returns [true|false] whether messages are processed in batches.
145 146 147 |
# File 'lib/semantic_logger/queue_processor.rb', line 145 def batch? @batch end |
#capped? ⇒ Boolean
Returns [true|false] whether the queue has a capped size.
134 135 136 |
# File 'lib/semantic_logger/queue_processor.rb', line 134 def capped? @capped end |
#close ⇒ Object
Flush any outstanding messages and close the appender.
181 182 183 |
# File 'lib/semantic_logger/queue_processor.rb', line 181 def close submit_request(:close) end |
#flush ⇒ Object
Flush all queued log entries to the appender. All queued log messages are written and then the appender is flushed.
176 177 178 |
# File 'lib/semantic_logger/queue_processor.rb', line 176 def flush submit_request(:flush) end |
#log(log) ⇒ Object
Add a log message to the queue for processing.
When non-blocking and the queue is full, the message is dropped instead of blocking the calling thread, and the count of dropped messages is reported periodically.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/semantic_logger/queue_processor.rb', line 153 def log(log) enqueued = true if non_blocking? begin queue.push(log, true) rescue ThreadError enqueued = false end else queue << log end # For batches wake up the processing thread once the number of queued messages has been # exceeded. Also wake it on a drop so it drains the full queue rather than waiting out the # batch interval. signal.set if batch? && (queue.size >= batch_size) enqueued end |
#logger ⇒ Object
Internal logger used to report problems encountered while processing log messages.
115 116 117 |
# File 'lib/semantic_logger/queue_processor.rb', line 115 def logger appender.logger end |
#non_blocking? ⇒ Boolean
Returns [true|false] whether messages are dropped instead of blocking when the queue is full. Only a capped queue can drop messages.
140 141 142 |
# File 'lib/semantic_logger/queue_processor.rb', line 140 def non_blocking? @non_blocking && capped? end |
#reopen ⇒ Object
Re-open the queue and worker thread after a fork.
186 187 188 189 190 191 192 193 194 |
# File 'lib/semantic_logger/queue_processor.rb', line 186 def reopen # Workaround CRuby crash on fork by recreating queue on reopen # https://github.com/reidmorrison/semantic_logger/issues/103 @queue&.close create_queue @thread&.kill if @thread&.alive? @thread = spawn_worker end |
#thread ⇒ Object
Returns [Thread] the worker thread.
Starts the worker thread if it is not currently running.
122 123 124 125 126 |
# File 'lib/semantic_logger/queue_processor.rb', line 122 def thread return @thread if @thread&.alive? @thread = spawn_worker end |