Class: WaterDrop::Polling::Poller
- Inherits:
-
Object
- Object
- WaterDrop::Polling::Poller
- Includes:
- Karafka::Core::Helpers::Time, Singleton
- Defined in:
- lib/waterdrop/polling/poller.rb
Overview
Newly registered producers may experience up to 1 second delay before their first poll cycle, as the poller thread only rebuilds its IO list when IO.select times out. This is acceptable because producers are expected to be long-lived and the initial connection overhead to Kafka typically exceeds this delay anyway.
Global poller singleton that manages a single polling thread for all FD-mode producers This replaces librdkafka’s native background polling threads with a single Ruby thread that uses IO.select for efficient multiplexing
Spawning one thread per producer is acceptable for 1-2 producers but in case of a system with several (transactional for example) the cost becomes bigger and bigger.
This implementation handles things by being event-driven instead of GVL releasing blocking.
Constant Summary collapse
- ID_MUTEX =
Mutex for thread-safe ID generation - initialized at class load time to avoid race conditions with lazy initialization
Mutex.new
Instance Attribute Summary collapse
-
#id ⇒ Integer
readonly
Unique identifier for this poller instance.
Class Method Summary collapse
-
.next_id ⇒ Integer
Generates incremental IDs for poller instances (starting from 0).
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Checks if the poller thread is alive.
-
#count ⇒ Integer
Returns the number of registered producers.
-
#in_poller_thread? ⇒ Boolean
Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock.
-
#initialize ⇒ Poller
constructor
Initializes an empty poller with no registered producers.
-
#register(producer, client) ⇒ Object
Registers a producer with the poller.
-
#shutdown! ⇒ Object
Shuts down the poller and resets state.
-
#unregister(producer) ⇒ Object
Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout.
Constructor Details
#initialize ⇒ Poller
Initializes an empty poller with no registered producers. The background thread is not started until the first producer is registered.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/waterdrop/polling/poller.rb', line 52 def initialize @id = self.class.next_id @mutex = Mutex.new @producers = {} @thread = nil @shutdown = false @pid = Process.pid # Cached collections - rebuilt only when producers change @cached_ios = [] @cached_io_to_state = {} @cached_states = [] @cached_result = nil @ios_dirty = true end |
Instance Attribute Details
#id ⇒ Integer (readonly)
Returns unique identifier for this poller instance.
48 49 50 |
# File 'lib/waterdrop/polling/poller.rb', line 48 def id @id end |
Class Method Details
.next_id ⇒ Integer
Generates incremental IDs for poller instances (starting from 0)
38 39 40 41 42 43 44 |
# File 'lib/waterdrop/polling/poller.rb', line 38 def next_id ID_MUTEX.synchronize do id = @id_counter @id_counter += 1 id end end |
Instance Method Details
#alive? ⇒ Boolean
Checks if the poller thread is alive
77 78 79 |
# File 'lib/waterdrop/polling/poller.rb', line 77 def alive? @thread&.alive? || false end |
#count ⇒ Integer
Returns the number of registered producers
83 84 85 |
# File 'lib/waterdrop/polling/poller.rb', line 83 def count @mutex.synchronize { @producers.size } end |
#in_poller_thread? ⇒ Boolean
Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock
71 72 73 |
# File 'lib/waterdrop/polling/poller.rb', line 71 def in_poller_thread? Thread.current == @thread end |
#register(producer, client) ⇒ Object
Registers a producer with the poller
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/waterdrop/polling/poller.rb', line 115 def register(producer, client) ensure_same_process! state = State.new( producer.id, client, producer.monitor, producer.config.polling.fd.max_time, producer.config.polling.fd.periodic_poll_interval ) @mutex.synchronize do @producers[producer.id] = state @ios_dirty = true # Reset shutdown flag in case thread is exiting but hasn't yet # This prevents race where new producer is closed by exiting thread @shutdown = false ensure_thread_running! end producer.monitor.instrument( "poller.producer_registered", producer_id: producer.id ) end |
#shutdown! ⇒ Object
This is primarily for testing to reset singleton state between tests
Shuts down the poller and resets state
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/waterdrop/polling/poller.rb', line 89 def shutdown! @mutex.synchronize { @shutdown = true } thread = @thread if thread&.alive? thread.join(5) thread.kill if thread.alive? end @mutex.synchronize do @producers.each_value { |state| state.close unless state.closed? } @producers.clear @thread = nil @shutdown = false @ios_dirty = true @cached_ios = [] @cached_io_to_state = {} @cached_states = [] @cached_result = nil @poll_timeout_s = nil end end |
#unregister(producer) ⇒ Object
Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/waterdrop/polling/poller.rb', line 146 def unregister(producer) ensure_same_process! state, thread = @mutex.synchronize { [@producers[producer.id], @thread] } return unless state # Signal the poller thread to handle removal state.signal_close # Wait for the state to be fully closed by the poller thread # This prevents race conditions where a new registration with the same # producer_id could be deleted by a pending close signal # Skip waiting if called from within the poller thread itself (e.g., from a callback) # to avoid deadlock - the poller thread can't wait for itself # The cleanup will happen after the callback returns state.wait_for_close unless Thread.current == thread producer.monitor.instrument( "poller.producer_unregistered", producer_id: producer.id ) end |