Class: WaterDrop::Polling::Poller

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time, Singleton
Defined in:
lib/waterdrop/polling/poller.rb

Overview

Note:

Newly registered producers may experience up to 1 second delay before their first poll cycle, as the poller thread only rebuilds its IO list when IO.select times out. This is acceptable because producers are expected to be long-lived and the initial connection overhead to Kafka typically exceeds this delay anyway.

Global poller singleton that manages a single polling thread for all FD-mode producers This replaces librdkafka’s native background polling threads with a single Ruby thread that uses IO.select for efficient multiplexing

Spawning one thread per producer is acceptable for 1-2 producers but in case of a system with several (transactional for example) the cost becomes bigger and bigger.

This implementation handles things by being event-driven instead of GVL releasing blocking.

Constant Summary collapse

ID_MUTEX =

Mutex for thread-safe ID generation - initialized at class load time to avoid race conditions with lazy initialization

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePoller

Initializes an empty poller with no registered producers. The background thread is not started until the first producer is registered.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/waterdrop/polling/poller.rb', line 52

def initialize
  @id = self.class.next_id
  @mutex = Mutex.new
  @producers = {}
  @thread = nil
  @shutdown = false
  @pid = Process.pid

  # Cached collections - rebuilt only when producers change
  @cached_ios = []
  @cached_io_to_state = {}
  @cached_states = []
  @cached_result = nil
  @ios_dirty = true
end

Instance Attribute Details

#idInteger (readonly)

Returns unique identifier for this poller instance.

Returns:

  • (Integer)

    unique identifier for this poller instance



48
49
50
# File 'lib/waterdrop/polling/poller.rb', line 48

def id
  @id
end

Class Method Details

.next_idInteger

Generates incremental IDs for poller instances (starting from 0)

Returns:

  • (Integer)

    next poller ID



38
39
40
41
42
43
44
# File 'lib/waterdrop/polling/poller.rb', line 38

def next_id
  ID_MUTEX.synchronize do
    id = @id_counter
    @id_counter += 1
    id
  end
end

Instance Method Details

#alive?Boolean

Checks if the poller thread is alive

Returns:

  • (Boolean)

    true if the poller thread is running



77
78
79
# File 'lib/waterdrop/polling/poller.rb', line 77

def alive?
  @thread&.alive? || false
end

#countInteger

Returns the number of registered producers

Returns:

  • (Integer)

    number of producers



83
84
85
# File 'lib/waterdrop/polling/poller.rb', line 83

def count
  @mutex.synchronize { @producers.size }
end

#in_poller_thread?Boolean

Checks if the current thread is the poller thread Used to detect when close is called from within a callback to avoid deadlock

Returns:

  • (Boolean)

    true if current thread is the poller thread



71
72
73
# File 'lib/waterdrop/polling/poller.rb', line 71

def in_poller_thread?
  Thread.current == @thread
end

#register(producer, client) ⇒ Object

Registers a producer with the poller

Parameters:

  • producer (WaterDrop::Producer)

    the producer instance

  • client (Rdkafka::Producer)

    the rdkafka client



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/waterdrop/polling/poller.rb', line 115

def register(producer, client)
  ensure_same_process!

  state = State.new(
    producer.id,
    client,
    producer.monitor,
    producer.config.polling.fd.max_time,
    producer.config.polling.fd.periodic_poll_interval
  )

  @mutex.synchronize do
    @producers[producer.id] = state
    @ios_dirty = true
    # Reset shutdown flag in case thread is exiting but hasn't yet
    # This prevents race where new producer is closed by exiting thread
    @shutdown = false
    ensure_thread_running!
  end

  producer.monitor.instrument(
    "poller.producer_registered",
    producer_id: producer.id
  )
end

#shutdown!Object

Note:

This is primarily for testing to reset singleton state between tests

Shuts down the poller and resets state



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/waterdrop/polling/poller.rb', line 89

def shutdown!
  @mutex.synchronize { @shutdown = true }

  thread = @thread
  if thread&.alive?
    thread.join(5)
    thread.kill if thread.alive?
  end

  @mutex.synchronize do
    @producers.each_value { |state| state.close unless state.closed? }
    @producers.clear
    @thread = nil
    @shutdown = false
    @ios_dirty = true
    @cached_ios = []
    @cached_io_to_state = {}
    @cached_states = []
    @cached_result = nil
    @poll_timeout_s = nil
  end
end

#unregister(producer) ⇒ Object

Unregisters a producer from the poller This method blocks until the producer is fully removed from the poller to prevent race conditions when disconnect/reconnect happens in quick succession This matches the threaded polling behavior which drains without timeout

Parameters:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/waterdrop/polling/poller.rb', line 146

def unregister(producer)
  ensure_same_process!

  state, thread = @mutex.synchronize { [@producers[producer.id], @thread] }

  return unless state

  # Signal the poller thread to handle removal
  state.signal_close

  # Wait for the state to be fully closed by the poller thread
  # This prevents race conditions where a new registration with the same
  # producer_id could be deleted by a pending close signal
  # Skip waiting if called from within the poller thread itself (e.g., from a callback)
  # to avoid deadlock - the poller thread can't wait for itself
  # The cleanup will happen after the callback returns
  state.wait_for_close unless Thread.current == thread

  producer.monitor.instrument(
    "poller.producer_unregistered",
    producer_id: producer.id
  )
end