Class: Legion::Cache::AsyncWriter

Inherits:
Object
  • Object
show all
Includes:
Logging::Helper
Defined in:
lib/legion/cache/async_writer.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
4
DEFAULT_QUEUE_SIZE =
1000
DEFAULT_SHUTDOWN_TIMEOUT =
5

Instance Method Summary collapse

Constructor Details

#initialize(pool_size: nil, queue_size: nil, shutdown_timeout: nil, settings_key: :cache) ⇒ AsyncWriter

Returns a new instance of AsyncWriter.



15
16
17
18
19
20
21
22
23
24
# File 'lib/legion/cache/async_writer.rb', line 15

def initialize(pool_size: nil, queue_size: nil, shutdown_timeout: nil, settings_key: :cache)
  @settings_key = settings_key
  @config_pool_size = pool_size
  @config_queue_size = queue_size
  @config_shutdown_timeout = shutdown_timeout
  @processed = Concurrent::AtomicFixnum.new(0)
  @failed = Concurrent::AtomicFixnum.new(0)
  @executor = nil
  @mutex = Mutex.new
end

Instance Method Details

#enqueue(&block) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/legion/cache/async_writer.rb', line 58

def enqueue(&block)
  executor = @executor
  if executor&.running?
    executor.post do
      block.call
      @processed.increment
    rescue StandardError => e
      handle_exception(e, level: :warn, handled: true, operation: :async_writer_job)
      @failed.increment
    end
  else
    begin
      block.call
      @processed.increment
    rescue StandardError => e
      handle_exception(e, level: :warn, handled: true, operation: :async_writer_sync_fallback)
      @failed.increment
    end
  end
end

#failed_countObject



95
96
97
# File 'lib/legion/cache/async_writer.rb', line 95

def failed_count
  @failed.value
end

#pool_sizeObject



83
84
85
# File 'lib/legion/cache/async_writer.rb', line 83

def pool_size
  @executor&.max_length || 0
end

#processed_countObject



91
92
93
# File 'lib/legion/cache/async_writer.rb', line 91

def processed_count
  @processed.value
end

#queue_depthObject



87
88
89
# File 'lib/legion/cache/async_writer.rb', line 87

def queue_depth
  @executor&.queue_length || 0
end

#running?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/legion/cache/async_writer.rb', line 79

def running?
  @executor&.running? == true
end

#start(pool_size: nil, queue_size: nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/legion/cache/async_writer.rb', line 26

def start(pool_size: nil, queue_size: nil, **)
  @mutex.synchronize do
    return if running?

    ps = pool_size || @config_pool_size || configured_pool_size
    qs = queue_size || @config_queue_size || configured_queue_size

    @executor = Concurrent::ThreadPoolExecutor.new(
      min_threads:     1,
      max_threads:     ps,
      max_queue:       qs,
      fallback_policy: :caller_runs
    )
    log.info "Legion::Cache::AsyncWriter started pool_size=#{ps} queue_size=#{qs}"
  end
end

#stop(timeout: nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/legion/cache/async_writer.rb', line 43

def stop(timeout: nil)
  @mutex.synchronize do
    return unless @executor

    to = timeout || @config_shutdown_timeout || configured_shutdown_timeout
    @executor.shutdown
    unless @executor.wait_for_termination(to)
      @executor.kill
      log.warn "Legion::Cache::AsyncWriter force-killed after #{to}s timeout"
    end
    log.info "Legion::Cache::AsyncWriter stopped processed=#{@processed.value}"
    @executor = nil
  end
end