Class: Legion::Cache::AsyncWriter
- Inherits:
-
Object
- Object
- Legion::Cache::AsyncWriter
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/cache/async_writer.rb
Constant Summary collapse
- DEFAULT_POOL_SIZE =
4- DEFAULT_QUEUE_SIZE =
1000- DEFAULT_SHUTDOWN_TIMEOUT =
5
Instance Method Summary collapse
- #enqueue(&block) ⇒ Object
- #failed_count ⇒ Object
-
#initialize(pool_size: nil, queue_size: nil, shutdown_timeout: nil, settings_key: :cache) ⇒ AsyncWriter
constructor
A new instance of AsyncWriter.
- #pool_size ⇒ Object
- #processed_count ⇒ Object
- #queue_depth ⇒ Object
- #running? ⇒ Boolean
- #start(pool_size: nil, queue_size: nil) ⇒ Object
- #stop(timeout: nil) ⇒ Object
Constructor Details
#initialize(pool_size: nil, queue_size: nil, shutdown_timeout: nil, settings_key: :cache) ⇒ AsyncWriter
Returns a new instance of AsyncWriter.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/legion/cache/async_writer.rb', line 15 def initialize(pool_size: nil, queue_size: nil, shutdown_timeout: nil, settings_key: :cache) @settings_key = settings_key @config_pool_size = pool_size @config_queue_size = queue_size @config_shutdown_timeout = shutdown_timeout @processed = Concurrent::AtomicFixnum.new(0) @failed = Concurrent::AtomicFixnum.new(0) @executor = nil @mutex = Mutex.new end |
Instance Method Details
#enqueue(&block) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/legion/cache/async_writer.rb', line 58 def enqueue(&block) executor = @executor if executor&.running? executor.post do block.call @processed.increment rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :async_writer_job) @failed.increment end else begin block.call @processed.increment rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :async_writer_sync_fallback) @failed.increment end end end |
#failed_count ⇒ Object
95 96 97 |
# File 'lib/legion/cache/async_writer.rb', line 95 def failed_count @failed.value end |
#pool_size ⇒ Object
83 84 85 |
# File 'lib/legion/cache/async_writer.rb', line 83 def pool_size @executor&.max_length || 0 end |
#processed_count ⇒ Object
91 92 93 |
# File 'lib/legion/cache/async_writer.rb', line 91 def processed_count @processed.value end |
#queue_depth ⇒ Object
87 88 89 |
# File 'lib/legion/cache/async_writer.rb', line 87 def queue_depth @executor&.queue_length || 0 end |
#running? ⇒ Boolean
79 80 81 |
# File 'lib/legion/cache/async_writer.rb', line 79 def running? @executor&.running? == true end |
#start(pool_size: nil, queue_size: nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/legion/cache/async_writer.rb', line 26 def start(pool_size: nil, queue_size: nil, **) @mutex.synchronize do return if running? ps = pool_size || @config_pool_size || configured_pool_size qs = queue_size || @config_queue_size || configured_queue_size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: ps, max_queue: qs, fallback_policy: :caller_runs ) log.info "Legion::Cache::AsyncWriter started pool_size=#{ps} queue_size=#{qs}" end end |
#stop(timeout: nil) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/cache/async_writer.rb', line 43 def stop(timeout: nil) @mutex.synchronize do return unless @executor to = timeout || @config_shutdown_timeout || configured_shutdown_timeout @executor.shutdown unless @executor.wait_for_termination(to) @executor.kill log.warn "Legion::Cache::AsyncWriter force-killed after #{to}s timeout" end log.info "Legion::Cache::AsyncWriter stopped processed=#{@processed.value}" @executor = nil end end |