Class: DeadBro::Dispatcher
- Inherits:
-
Object
- Object
- DeadBro::Dispatcher
- Defined in:
- lib/dead_bro/dispatcher.rb
Overview
Background worker pool that runs HTTP posts for Client off the request thread. Replaces the previous ‘Thread.new` per metric. One shared pool per process; re-initializes after fork (Puma, Unicorn).
Constant Summary collapse
- DEFAULT_QUEUE_SIZE =
500- DEFAULT_WORKERS =
2- SHUTDOWN =
Object.new
Class Attribute Summary collapse
-
.inline ⇒ Object
Test hook — when true, ‘dispatch` runs the block inline on the caller thread instead of handing it to a worker.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Exposed for tests.
Instance Method Summary collapse
-
#dispatch(&block) ⇒ Object
Schedule a block for background execution.
- #dropped_count ⇒ Object
-
#initialize(queue_size: DEFAULT_QUEUE_SIZE, workers: DEFAULT_WORKERS) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #shutdown ⇒ Object
Constructor Details
#initialize(queue_size: DEFAULT_QUEUE_SIZE, workers: DEFAULT_WORKERS) ⇒ Dispatcher
Returns a new instance of Dispatcher.
31 32 33 34 35 36 37 38 39 |
# File 'lib/dead_bro/dispatcher.rb', line 31 def initialize(queue_size: DEFAULT_QUEUE_SIZE, workers: DEFAULT_WORKERS) @queue_size = queue_size @worker_count = workers @mutex = Mutex.new @dropped = 0 @shutting_down = false boot_workers(Process.pid) install_at_exit_hook end |
Class Attribute Details
.inline ⇒ Object
Test hook — when true, ‘dispatch` runs the block inline on the caller thread instead of handing it to a worker. Keeps specs deterministic without having to stub `Thread.new` or poll for queue drain.
28 29 30 |
# File 'lib/dead_bro/dispatcher.rb', line 28 def inline @inline end |
Class Method Details
.instance ⇒ Object
15 16 17 |
# File 'lib/dead_bro/dispatcher.rb', line 15 def instance @instance ||= new end |
.reset! ⇒ Object
Exposed for tests.
20 21 22 23 |
# File 'lib/dead_bro/dispatcher.rb', line 20 def reset! @instance&.shutdown @instance = nil end |
Instance Method Details
#dispatch(&block) ⇒ Object
Schedule a block for background execution. Never blocks the caller: if the queue is full the job is dropped and ‘dropped_count` is incremented.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/dead_bro/dispatcher.rb', line 43 def dispatch(&block) return false unless block_given? return false if @shutting_down if self.class.inline begin block.call rescue # Match worker semantics — swallow job errors. end return true end ensure_workers_alive! @queue.push(block, true) # non-blocking true rescue ThreadError @mutex.synchronize { @dropped += 1 } false end |
#dropped_count ⇒ Object
64 65 66 |
# File 'lib/dead_bro/dispatcher.rb', line 64 def dropped_count @mutex.synchronize { @dropped } end |
#shutdown ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/dead_bro/dispatcher.rb', line 68 def shutdown return if @shutting_down @shutting_down = true workers = @workers || [] workers.length.times do begin @queue.push(SHUTDOWN) rescue end end workers.each do |t| begin t.join(2) rescue end end end |