Class: ShopCircle::Orbit::Worker
- Inherits:
-
Object
- Object
- ShopCircle::Orbit::Worker
- Defined in:
- lib/shopcircle/orbit/worker.rb
Constant Summary collapse
- FLUSH_TIMEOUT =
5- MAX_CONSECUTIVE_ERRORS =
5
Instance Attribute Summary collapse
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(queue:, config:, transport:) ⇒ Worker
constructor
A new instance of Worker.
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(queue:, config:, transport:) ⇒ Worker
Returns a new instance of Worker.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/shopcircle/orbit/worker.rb', line 11 def initialize(queue:, config:, transport:) @queue = queue @config = config @transport = transport @thread = nil @mutex = Mutex.new @condvar = ConditionVariable.new @running = false @flush_waiters = [] @consecutive_errors = 0 end |
Instance Attribute Details
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
6 7 8 |
# File 'lib/shopcircle/orbit/worker.rb', line 6 def transport @transport end |
Instance Method Details
#flush ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/shopcircle/orbit/worker.rb', line 43 def flush return unless running? done = Queue.new @mutex.synchronize do @flush_waiters << done @condvar.signal end # Poll with short sleeps instead of Timeout (which uses Thread.raise) deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + FLUSH_TIMEOUT loop do return done.pop(true) if !done.empty? if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline @mutex.synchronize { @flush_waiters.delete(done) } return nil end sleep(0.01) end end |
#start ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/shopcircle/orbit/worker.rb', line 23 def start @mutex.synchronize do return if @running @running = true end @thread = Thread.new { run_loop } @thread.abort_on_exception = false end |
#stop ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/shopcircle/orbit/worker.rb', line 33 def stop @mutex.synchronize do @running = false @condvar.signal end @thread&.join(5) flush_all notify_all_waiters end |