Class: ShopCircle::Orbit::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/shopcircle/orbit/worker.rb

Constant Summary collapse

FLUSH_TIMEOUT =
5
MAX_CONSECUTIVE_ERRORS =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue:, config:, transport:) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/shopcircle/orbit/worker.rb', line 11

def initialize(queue:, config:, transport:)
  @queue     = queue
  @config    = config
  @transport = transport
  @thread    = nil
  @mutex     = Mutex.new
  @condvar   = ConditionVariable.new
  @running   = false
  @flush_waiters = []
  @consecutive_errors = 0
end

Instance Attribute Details

#transportObject (readonly)

Returns the value of attribute transport.



6
7
8
# File 'lib/shopcircle/orbit/worker.rb', line 6

def transport
  @transport
end

Instance Method Details

#flushObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/shopcircle/orbit/worker.rb', line 43

def flush
  return unless running?

  done = Queue.new
  @mutex.synchronize do
    @flush_waiters << done
    @condvar.signal
  end

  # Poll with short sleeps instead of Timeout (which uses Thread.raise)
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + FLUSH_TIMEOUT
  loop do
    return done.pop(true) if !done.empty?
    if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
      @mutex.synchronize { @flush_waiters.delete(done) }
      return nil
    end
    sleep(0.01)
  end
end

#startObject



23
24
25
26
27
28
29
30
31
# File 'lib/shopcircle/orbit/worker.rb', line 23

def start
  @mutex.synchronize do
    return if @running
    @running = true
  end

  @thread = Thread.new { run_loop }
  @thread.abort_on_exception = false
end

#stopObject



33
34
35
36
37
38
39
40
41
# File 'lib/shopcircle/orbit/worker.rb', line 33

def stop
  @mutex.synchronize do
    @running = false
    @condvar.signal
  end
  @thread&.join(5)
  flush_all
  notify_all_waiters
end