Class: Upkeep::Delivery::AsyncDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/delivery/async_dispatcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(batch_window: 0.01, &deliver) ⇒ AsyncDispatcher

Returns a new instance of AsyncDispatcher.



6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 6

def initialize(batch_window: 0.01, &deliver)
  @deliver = deliver
  @batch_window = batch_window
  @jobs = []
  @mutex = Mutex.new
  @available = ConditionVariable.new
  @idle = ConditionVariable.new
  @pending_jobs = 0
  @last_error = nil
  @stopping = false
  @worker = Thread.new { work_loop }
end

Instance Method Details

#drainObject



34
35
36
37
38
39
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 34

def drain
  @mutex.synchronize do
    @idle.wait(@mutex) until @pending_jobs.zero?
    raise @last_error if @last_error
  end
end

#enqueue(changes) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 19

def enqueue(changes)
  changes = Array(changes)
  return Transport::DispatchReport.new([]) if changes.empty?

  @mutex.synchronize do
    raise @last_error if @last_error

    @pending_jobs += 1
    @jobs << changes
    @available.signal
  end

  Transport::DispatchReport.new([])
end

#shutdownObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 41

def shutdown
  error = nil
  begin
    drain
  rescue StandardError => shutdown_error
    error = shutdown_error
  ensure
    @mutex.synchronize do
      @stopping = true
      @available.signal
    end
    @worker.join
  end

  raise error if error
end