Module: JobTick::Dispatcher

Defined in:
lib/jobtick/dispatcher.rb

Overview

Asynchronous, single-threaded HTTP dispatcher with a persistent keep-alive connection. Job threads call .enqueue and return immediately; the dispatcher daemon thread drains the queue and posts to the JobTick API.

All HTTP work (sync register + async pings) shares one Net::HTTP instance serialized by @http_mutex. The connection is reopened lazily after errors.

Constant Summary collapse

SHUTDOWN_SIGNAL =
:__shutdown__
HEADER_CONTENT_TYPE =
"application/json"
USER_AGENT =
"jobtick-ruby/#{JobTick::VERSION}".freeze
OPEN_TIMEOUT =
5
READ_TIMEOUT =
5
KEEP_ALIVE_TIMEOUT =
30
NETWORK_ERRORS =
[
  IOError, EOFError,
  Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ECONNABORTED,
  Errno::EPIPE, Errno::ETIMEDOUT, Errno::EHOSTUNREACH,
  Net::OpenTimeout, Net::ReadTimeout,
  OpenSSL::SSL::SSLError, SocketError
].freeze

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.synchronousObject

Returns the value of attribute synchronous.



32
33
34
# File 'lib/jobtick/dispatcher.rb', line 32

def synchronous
  @synchronous
end

Class Method Details

.droppedObject



69
70
71
# File 'lib/jobtick/dispatcher.rb', line 69

def dropped
  @dropped || 0
end

.enqueue(path, payload) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/jobtick/dispatcher.rb', line 34

def enqueue(path, payload)
  return send_request(path, payload) if @synchronous

  ensure_started
  @queue.push([path, payload], true)
  nil
rescue ThreadError
  @dropped += 1
  nil
end

.flush(timeout: 5) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/jobtick/dispatcher.rb', line 49

def flush(timeout: 5)
  return unless @running && @queue

  deadline = monotonic + timeout
  until @queue.empty? && @inflight.zero?
    sleep 0.001
    break if monotonic > deadline
  end
end

.reset!Object



73
74
75
76
77
78
79
80
81
82
# File 'lib/jobtick/dispatcher.rb', line 73

def reset!
  shutdown(timeout: 1) if @running
  @queue = nil
  @thread = nil
  @dropped = 0
  @inflight = 0
  @endpoint_uri = nil
  @at_exit_registered = false
  @synchronous = false
end

.send_sync(path, payload) ⇒ Object



45
46
47
# File 'lib/jobtick/dispatcher.rb', line 45

def send_sync(path, payload)
  send_request(path, payload)
end

.shutdown(timeout: 2) ⇒ Object



59
60
61
62
63
64
65
66
67
# File 'lib/jobtick/dispatcher.rb', line 59

def shutdown(timeout: 2)
  return unless @running

  @running = false
  @queue&.push(SHUTDOWN_SIGNAL)
  @thread&.join(timeout)
  close_http
  nil
end