Class: WaterDrop::Polling::State

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/waterdrop/polling/state.rb

Overview

Holds the state for a registered producer in the poller Each producer has its own State instance that tracks:

  • The producer ID and client reference

  • Queue pipe for IO.select monitoring (shared with librdkafka for efficiency)

  • Configuration (max poll time)

  • Last poll time for staleness detection

  • Closing flag for shutdown signaling

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State

Creates a new state for a producer

Parameters:

  • producer_id (String)

    unique producer ID

  • client (Rdkafka::Producer)

    the rdkafka producer client

  • monitor (Object)

    the producer’s monitor for error reporting

  • max_poll_time (Integer)

    max time in ms to poll per cycle

  • periodic_poll_interval (Integer)

    max time in ms before this producer needs periodic poll

Raises:

  • (StandardError)

    if queue pipe setup fails (FD mode requires this to work)



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/waterdrop/polling/state.rb', line 31

def initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval)
  @producer_id = producer_id
  @client = client
  @monitor = monitor
  @max_poll_time = max_poll_time
  @periodic_poll_interval = periodic_poll_interval
  # Initialize to 0 so first check always triggers (no nil handling needed)
  @last_poll_time = 0
  @last_stale_check = 0
  @last_stale_result = false

  # Closing flag - set by signal_close, checked by poller
  @closing = false

  # Latch for synchronizing close operations
  @close_latch = Latch.new

  # Queue pipe for all signaling (librdkafka events + continue + close)
  # Reusing one pipe reduces FDs and IO.select overhead
  @queue_pipe = QueuePipe.new(@client)

  # Cache reader reference for hot path performance
  @io = @queue_pipe.reader
end

Instance Attribute Details

#ioIO (readonly)

Returns the queue pipe reader for IO.select monitoring.

Returns:

  • (IO)

    the queue pipe reader for IO.select monitoring



19
20
21
# File 'lib/waterdrop/polling/state.rb', line 19

def io
  @io
end

#monitorObject (readonly)

Returns the producer’s monitor for instrumentation.

Returns:

  • (Object)

    the producer’s monitor for instrumentation



22
23
24
# File 'lib/waterdrop/polling/state.rb', line 22

def monitor
  @monitor
end

#producer_idString (readonly)

Returns producer ID.

Returns:

  • (String)

    producer ID



16
17
18
# File 'lib/waterdrop/polling/state.rb', line 16

def producer_id
  @producer_id
end

Instance Method Details

#closeObject

Closes all resources and signals any waiters



129
130
131
132
133
134
# File 'lib/waterdrop/polling/state.rb', line 129

def close
  return if closed?

  @queue_pipe.close
  @close_latch.release!
end

#closed?Boolean

Returns whether this state has been closed.

Returns:

  • (Boolean)

    whether this state has been closed



144
145
146
# File 'lib/waterdrop/polling/state.rb', line 144

def closed?
  @close_latch.released?
end

#closing?Boolean

Returns whether this producer is being closed.

Returns:

  • (Boolean)

    whether this producer is being closed



124
125
126
# File 'lib/waterdrop/polling/state.rb', line 124

def closing?
  @closing
end

#drainObject

Drains the queue pipe. Called before polling to clear any pending signals



57
58
59
# File 'lib/waterdrop/polling/state.rb', line 57

def drain
  @queue_pipe.drain
end

#mark_polled!Object

Marks this producer as having been polled. Called after polling to track staleness



91
92
93
# File 'lib/waterdrop/polling/state.rb', line 91

def mark_polled!
  @last_poll_time = monotonic_now
end

#needs_periodic_poll?Boolean

Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks

Returns:

  • (Boolean)

    true if the producer needs a periodic poll



99
100
101
102
103
104
105
106
107
# File 'lib/waterdrop/polling/state.rb', line 99

def needs_periodic_poll?
  now = monotonic_now

  # Throttle: return cached result if checked recently
  return @last_stale_result if (now - @last_stale_check) < STALE_CHECK_THROTTLE_MS

  @last_stale_check = now
  @last_stale_result = (now - @last_poll_time) >= @periodic_poll_interval
end

#pollBoolean

Polls the producer’s event queue

Returns:

  • (Boolean)

    true if no more events to process, false if stopped due to time limit



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/waterdrop/polling/state.rb', line 63

def poll
  drained = true
  deadline = monotonic_now + @max_poll_time

  @client.events_poll_nb_each do |count|
    if count.zero?
      :stop
    elsif monotonic_now >= deadline
      drained = false
      :stop
    end
  end

  drained
end

#queue_empty?Boolean

Checks if the producer’s event queue is empty

Returns:

  • (Boolean)

    true if queue is empty



81
82
83
# File 'lib/waterdrop/polling/state.rb', line 81

def queue_empty?
  @client.queue_size.zero?
end

#signal_closeObject

Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it



112
113
114
115
# File 'lib/waterdrop/polling/state.rb', line 112

def signal_close
  @closing = true
  @queue_pipe.signal
end

#signal_continueObject

Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout



119
120
121
# File 'lib/waterdrop/polling/state.rb', line 119

def signal_continue
  @queue_pipe.signal
end

#wait_for_closeObject

Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout



139
140
141
# File 'lib/waterdrop/polling/state.rb', line 139

def wait_for_close
  @close_latch.wait
end