Class: DeadBro::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/dead_bro/dispatcher.rb

Overview

Background worker pool that runs HTTP posts for Client off the request thread. Replaces the previous ‘Thread.new` per metric. One shared pool per process; re-initializes after fork (Puma, Unicorn).

Constant Summary collapse

DEFAULT_QUEUE_SIZE =
500
DEFAULT_WORKERS =
2
SHUTDOWN =
Object.new

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_size: DEFAULT_QUEUE_SIZE, workers: DEFAULT_WORKERS) ⇒ Dispatcher

Returns a new instance of Dispatcher.



31
32
33
34
35
36
37
38
39
# File 'lib/dead_bro/dispatcher.rb', line 31

def initialize(queue_size: DEFAULT_QUEUE_SIZE, workers: DEFAULT_WORKERS)
  @queue_size = queue_size
  @worker_count = workers
  @mutex = Mutex.new
  @dropped = 0
  @shutting_down = false
  boot_workers(Process.pid)
  install_at_exit_hook
end

Class Attribute Details

.inlineObject

Test hook — when true, ‘dispatch` runs the block inline on the caller thread instead of handing it to a worker. Keeps specs deterministic without having to stub `Thread.new` or poll for queue drain.



28
29
30
# File 'lib/dead_bro/dispatcher.rb', line 28

def inline
  @inline
end

Class Method Details

.instanceObject



15
16
17
# File 'lib/dead_bro/dispatcher.rb', line 15

def instance
  @instance ||= new
end

.reset!Object

Exposed for tests.



20
21
22
23
# File 'lib/dead_bro/dispatcher.rb', line 20

def reset!
  @instance&.shutdown
  @instance = nil
end

Instance Method Details

#dispatch(&block) ⇒ Object

Schedule a block for background execution. Never blocks the caller: if the queue is full the job is dropped and ‘dropped_count` is incremented.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/dead_bro/dispatcher.rb', line 43

def dispatch(&block)
  return false unless block_given?
  return false if @shutting_down

  if self.class.inline
    begin
      block.call
    rescue
      # Match worker semantics — swallow job errors.
    end
    return true
  end

  ensure_workers_alive!
  @queue.push(block, true) # non-blocking
  true
rescue ThreadError
  @mutex.synchronize { @dropped += 1 }
  false
end

#dropped_countObject



64
65
66
# File 'lib/dead_bro/dispatcher.rb', line 64

def dropped_count
  @mutex.synchronize { @dropped }
end

#shutdownObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/dead_bro/dispatcher.rb', line 68

def shutdown
  return if @shutting_down
  @shutting_down = true
  workers = @workers || []
  workers.length.times do
    begin
      @queue.push(SHUTDOWN)
    rescue
    end
  end
  workers.each do |t|
    begin
      t.join(2)
    rescue
    end
  end
end