Class: Upkeep::Delivery::AsyncDispatcher
- Inherits:
-
Object
- Object
- Upkeep::Delivery::AsyncDispatcher
- Defined in:
- lib/upkeep/delivery/async_dispatcher.rb
Instance Method Summary collapse
- #drain ⇒ Object
- #enqueue(changes) ⇒ Object
-
#initialize(batch_window: 0.01, &deliver) ⇒ AsyncDispatcher
constructor
A new instance of AsyncDispatcher.
- #shutdown ⇒ Object
Constructor Details
#initialize(batch_window: 0.01, &deliver) ⇒ AsyncDispatcher
Returns a new instance of AsyncDispatcher.
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 6 def initialize(batch_window: 0.01, &deliver) @deliver = deliver @batch_window = batch_window @jobs = [] @mutex = Mutex.new @available = ConditionVariable.new @idle = ConditionVariable.new @pending_jobs = 0 @last_error = nil @stopping = false @worker = Thread.new { work_loop } end |
Instance Method Details
#drain ⇒ Object
34 35 36 37 38 39 |
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 34 def drain @mutex.synchronize do @idle.wait(@mutex) until @pending_jobs.zero? raise @last_error if @last_error end end |
#enqueue(changes) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 19 def enqueue(changes) changes = Array(changes) return Transport::DispatchReport.new([]) if changes.empty? @mutex.synchronize do raise @last_error if @last_error @pending_jobs += 1 @jobs << changes @available.signal end Transport::DispatchReport.new([]) end |
#shutdown ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/upkeep/delivery/async_dispatcher.rb', line 41 def shutdown error = nil begin drain rescue StandardError => shutdown_error error = shutdown_error ensure @mutex.synchronize do @stopping = true @available.signal end @worker.join end raise error if error end |