Class: Legion::Logging::AsyncWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/logging/async_writer.rb

Defined Under Namespace

Classes: LogEntry

Constant Summary collapse

SHUTDOWN =
:shutdown
THREAD_KEYS =
%i[
  legion_log_segments legion_log_method legion_log_caller
  legion_log_conv_id legion_log_request_id legion_log_exchange_id legion_log_chain_id
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, buffer_size: 10_000) ⇒ AsyncWriter

Returns a new instance of AsyncWriter.



18
19
20
21
22
23
24
25
# File 'lib/legion/logging/async_writer.rb', line 18

def initialize(logger, buffer_size: 10_000)
  @logger = logger
  @buffer_size = buffer_size
  @queue  = SizedQueue.new(buffer_size)
  @thread = nil
  @state_mutex = Mutex.new
  @accepting = true
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



16
17
18
# File 'lib/legion/logging/async_writer.rb', line 16

def logger
  @logger
end

Instance Method Details

#alive?Boolean

rubocop:enable Naming/PredicateMethod

Returns:

  • (Boolean)


66
67
68
# File 'lib/legion/logging/async_writer.rb', line 66

def alive?
  @thread&.alive? || false
end

#push(entry) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/legion/logging/async_writer.rb', line 56

def push(entry)
  return false unless accepting?

  @queue.push(entry)
  true
rescue ClosedQueueError
  false
end

#startObject



27
28
29
30
31
32
33
34
35
36
# File 'lib/legion/logging/async_writer.rb', line 27

def start
  return if @thread&.alive?

  @state_mutex.synchronize { @accepting = true }
  drain
  @queue = SizedQueue.new(@buffer_size)
  @thread = Thread.new { consume }
  @thread.name = 'legion-log-writer'
  @thread.abort_on_exception = false
end

#stop(timeout: 2) ⇒ Object

rubocop:disable Naming/PredicateMethod



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/legion/logging/async_writer.rb', line 39

def stop(timeout: 2)
  @state_mutex.synchronize { @accepting = false }

  unless @thread&.alive?
    drain
    @thread = nil
    return true
  end

  @queue.close
  timeout ? @thread.join(timeout) : @thread.join
  return false if @thread&.alive?

  @thread = nil
  true
end