Class: CI::Queue::Redis::Base::HeartbeatProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/base.rb

Constant Summary collapse

MAX_RESTART_ATTEMPTS =
3
TICK_COMMAND =

Cached command marker. Passing this frozen String instead of the :tick! Symbol avoids allocating a ‘tick!’ String (from Symbol#to_s) on every heartbeat, which fires once per running test per worker.

'tick!'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(redis_url, zset_key, owners_key, leases_key) ⇒ HeartbeatProcess

Returns a new instance of HeartbeatProcess.



274
275
276
277
278
279
# File 'lib/ci/queue/redis/base.rb', line 274

def initialize(redis_url, zset_key, owners_key, leases_key)
  @redis_url = redis_url
  @zset_key = zset_key
  @owners_key = owners_key
  @leases_key = leases_key
end

Instance Method Details

#boot!Object



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/ci/queue/redis/base.rb', line 281

def boot!
  child_read, @pipe = IO.pipe
  ready_pipe, child_write = IO.pipe
  @pipe.binmode
  @pid = Process.spawn(
    RbConfig.ruby,
    ::File.join(__dir__, "monitor.rb"),
    @redis_url,
    @zset_key,
    @owners_key,
    @leases_key,
    in: child_read,
    out: child_write,
  )
  child_read.close
  child_write.close

  # Check the process is alive.
  if ready_pipe.wait_readable(10)
    ready_pipe.gets
    ready_pipe.close
    Process.kill(0, @pid)
  else
    Process.kill(0, @pid)
    Process.wait(@pid)
    raise "Monitor child wasn't ready after 10 seconds"
  end
  @pipe
end

#shutdown!Object



311
312
313
314
315
316
317
318
319
# File 'lib/ci/queue/redis/base.rb', line 311

def shutdown!
  @pipe.close
  begin
    _, status = Process.waitpid2(@pid)
    status
  rescue Errno::ECHILD
    nil
  end
end

#tick!(id, lease) ⇒ Object



321
322
323
324
325
326
327
328
329
330
# File 'lib/ci/queue/redis/base.rb', line 321

def tick!(id, lease)
  send_message(TICK_COMMAND, id: id, lease: lease.to_s)
  @restart_attempts = 0
rescue IOError, Errno::EPIPE => error
  @restart_attempts = (@restart_attempts || 0) + 1
  raise if @restart_attempts > MAX_RESTART_ATTEMPTS

  restart!
  retry
end