Module: JobTick::Dispatcher
- Defined in:
- lib/jobtick/dispatcher.rb
Overview
Asynchronous, single-threaded HTTP dispatcher with a persistent keep-alive connection. Job threads call .enqueue and return immediately; the dispatcher daemon thread drains the queue and posts to the JobTick API.
All HTTP work (sync register + async pings) shares one Net::HTTP instance serialized by @http_mutex. The connection is reopened lazily after errors.
Constant Summary collapse
- SHUTDOWN_SIGNAL =
:__shutdown__- HEADER_CONTENT_TYPE =
"application/json"- USER_AGENT =
"jobtick-ruby/#{JobTick::VERSION}".freeze
- OPEN_TIMEOUT =
5- READ_TIMEOUT =
5- KEEP_ALIVE_TIMEOUT =
30- NETWORK_ERRORS =
[ IOError, EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ECONNABORTED, Errno::EPIPE, Errno::ETIMEDOUT, Errno::EHOSTUNREACH, Net::OpenTimeout, Net::ReadTimeout, OpenSSL::SSL::SSLError, SocketError ].freeze
Class Attribute Summary collapse
-
.synchronous ⇒ Object
Returns the value of attribute synchronous.
Class Method Summary collapse
- .dropped ⇒ Object
- .enqueue(path, payload) ⇒ Object
- .flush(timeout: 5) ⇒ Object
- .reset! ⇒ Object
- .send_sync(path, payload) ⇒ Object
- .shutdown(timeout: 2) ⇒ Object
Class Attribute Details
.synchronous ⇒ Object
Returns the value of attribute synchronous.
32 33 34 |
# File 'lib/jobtick/dispatcher.rb', line 32 def synchronous @synchronous end |
Class Method Details
.dropped ⇒ Object
69 70 71 |
# File 'lib/jobtick/dispatcher.rb', line 69 def dropped @dropped || 0 end |
.enqueue(path, payload) ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/jobtick/dispatcher.rb', line 34 def enqueue(path, payload) return send_request(path, payload) if @synchronous ensure_started @queue.push([path, payload], true) nil rescue ThreadError @dropped += 1 nil end |
.flush(timeout: 5) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/jobtick/dispatcher.rb', line 49 def flush(timeout: 5) return unless @running && @queue deadline = monotonic + timeout until @queue.empty? && @inflight.zero? sleep 0.001 break if monotonic > deadline end end |
.reset! ⇒ Object
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/jobtick/dispatcher.rb', line 73 def reset! shutdown(timeout: 1) if @running @queue = nil @thread = nil @dropped = 0 @inflight = 0 @endpoint_uri = nil @at_exit_registered = false @synchronous = false end |
.send_sync(path, payload) ⇒ Object
45 46 47 |
# File 'lib/jobtick/dispatcher.rb', line 45 def send_sync(path, payload) send_request(path, payload) end |
.shutdown(timeout: 2) ⇒ Object
59 60 61 62 63 64 65 66 67 |
# File 'lib/jobtick/dispatcher.rb', line 59 def shutdown(timeout: 2) return unless @running @running = false @queue&.push(SHUTDOWN_SIGNAL) @thread&.join(timeout) close_http nil end |