Class: CI::Queue::Redis::Base::HeartbeatProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/base.rb

Constant Summary collapse

MAX_RESTART_ATTEMPTS =
3

Instance Method Summary collapse

Constructor Details

#initialize(redis_url, zset_key, owners_key, leases_key) ⇒ HeartbeatProcess

Returns a new instance of HeartbeatProcess.



269
270
271
272
273
274
# File 'lib/ci/queue/redis/base.rb', line 269

def initialize(redis_url, zset_key, owners_key, leases_key)
  @redis_url = redis_url
  @zset_key = zset_key
  @owners_key = owners_key
  @leases_key = leases_key
end

Instance Method Details

#boot!Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/ci/queue/redis/base.rb', line 276

def boot!
  child_read, @pipe = IO.pipe
  ready_pipe, child_write = IO.pipe
  @pipe.binmode
  @pid = Process.spawn(
    RbConfig.ruby,
    ::File.join(__dir__, "monitor.rb"),
    @redis_url,
    @zset_key,
    @owners_key,
    @leases_key,
    in: child_read,
    out: child_write,
  )
  child_read.close
  child_write.close

  # Check the process is alive.
  if ready_pipe.wait_readable(10)
    ready_pipe.gets
    ready_pipe.close
    Process.kill(0, @pid)
  else
    Process.kill(0, @pid)
    Process.wait(@pid)
    raise "Monitor child wasn't ready after 10 seconds"
  end
  @pipe
end

#shutdown!Object



306
307
308
309
310
311
312
313
314
# File 'lib/ci/queue/redis/base.rb', line 306

def shutdown!
  @pipe.close
  begin
    _, status = Process.waitpid2(@pid)
    status
  rescue Errno::ECHILD
    nil
  end
end

#tick!(id, lease) ⇒ Object



316
317
318
319
320
321
322
323
324
325
# File 'lib/ci/queue/redis/base.rb', line 316

def tick!(id, lease)
  send_message(:tick!, id: id, lease: lease.to_s)
  @restart_attempts = 0
rescue IOError, Errno::EPIPE => error
  @restart_attempts = (@restart_attempts || 0) + 1
  raise if @restart_attempts > MAX_RESTART_ATTEMPTS

  restart!
  retry
end