Class: WaterDrop::Polling::State
- Inherits:
-
Object
- Object
- WaterDrop::Polling::State
- Includes:
- Karafka::Core::Helpers::Time
- Defined in:
- lib/waterdrop/polling/state.rb
Overview
Holds the state for a registered producer in the poller Each producer has its own State instance that tracks:
-
The producer ID and client reference
-
Queue pipe for IO.select monitoring (shared with librdkafka for efficiency)
-
Configuration (max poll time)
-
Last poll time for staleness detection
-
Closing flag for shutdown signaling
Instance Attribute Summary collapse
-
#io ⇒ IO
readonly
The queue pipe reader for IO.select monitoring.
-
#monitor ⇒ Object
readonly
The producer’s monitor for instrumentation.
-
#producer_id ⇒ String
readonly
Producer ID.
Instance Method Summary collapse
-
#close ⇒ Object
Closes all resources and signals any waiters.
-
#closed? ⇒ Boolean
Whether this state has been closed.
-
#closing? ⇒ Boolean
Whether this producer is being closed.
-
#drain ⇒ Object
Drains the queue pipe.
-
#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State
constructor
Creates a new state for a producer.
-
#mark_polled! ⇒ Object
Marks this producer as having been polled.
-
#needs_periodic_poll? ⇒ Boolean
Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks.
-
#poll ⇒ Boolean
Polls the producer’s event queue.
-
#queue_empty? ⇒ Boolean
Checks if the producer’s event queue is empty.
-
#signal_close ⇒ Object
Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it.
-
#signal_continue ⇒ Object
Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout.
-
#wait_for_close ⇒ Object
Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout.
Constructor Details
#initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) ⇒ State
Creates a new state for a producer
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/waterdrop/polling/state.rb', line 31 def initialize(producer_id, client, monitor, max_poll_time, periodic_poll_interval) @producer_id = producer_id @client = client @monitor = monitor @max_poll_time = max_poll_time @periodic_poll_interval = periodic_poll_interval # Initialize to 0 so first check always triggers (no nil handling needed) @last_poll_time = 0 @last_stale_check = 0 @last_stale_result = false # Closing flag - set by signal_close, checked by poller @closing = false # Latch for synchronizing close operations @close_latch = Latch.new # Queue pipe for all signaling (librdkafka events + continue + close) # Reusing one pipe reduces FDs and IO.select overhead @queue_pipe = QueuePipe.new(@client) # Cache reader reference for hot path performance @io = @queue_pipe.reader end |
Instance Attribute Details
#io ⇒ IO (readonly)
Returns the queue pipe reader for IO.select monitoring.
19 20 21 |
# File 'lib/waterdrop/polling/state.rb', line 19 def io @io end |
#monitor ⇒ Object (readonly)
Returns the producer’s monitor for instrumentation.
22 23 24 |
# File 'lib/waterdrop/polling/state.rb', line 22 def monitor @monitor end |
#producer_id ⇒ String (readonly)
Returns producer ID.
16 17 18 |
# File 'lib/waterdrop/polling/state.rb', line 16 def producer_id @producer_id end |
Instance Method Details
#close ⇒ Object
Closes all resources and signals any waiters
129 130 131 132 133 134 |
# File 'lib/waterdrop/polling/state.rb', line 129 def close return if closed? @queue_pipe.close @close_latch.release! end |
#closed? ⇒ Boolean
Returns whether this state has been closed.
144 145 146 |
# File 'lib/waterdrop/polling/state.rb', line 144 def closed? @close_latch.released? end |
#closing? ⇒ Boolean
Returns whether this producer is being closed.
124 125 126 |
# File 'lib/waterdrop/polling/state.rb', line 124 def closing? @closing end |
#drain ⇒ Object
Drains the queue pipe. Called before polling to clear any pending signals
57 58 59 |
# File 'lib/waterdrop/polling/state.rb', line 57 def drain @queue_pipe.drain end |
#mark_polled! ⇒ Object
Marks this producer as having been polled. Called after polling to track staleness
91 92 93 |
# File 'lib/waterdrop/polling/state.rb', line 91 def mark_polled! @last_poll_time = monotonic_now end |
#needs_periodic_poll? ⇒ Boolean
Checks if this producer needs a periodic poll Used to ensure OAuth/stats callbacks fire even when another producer is busy Includes internal throttling to avoid excessive checks
99 100 101 102 103 104 105 106 107 |
# File 'lib/waterdrop/polling/state.rb', line 99 def needs_periodic_poll? now = monotonic_now # Throttle: return cached result if checked recently return @last_stale_result if (now - @last_stale_check) < STALE_CHECK_THROTTLE_MS @last_stale_check = now @last_stale_result = (now - @last_poll_time) >= @periodic_poll_interval end |
#poll ⇒ Boolean
Polls the producer’s event queue
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/waterdrop/polling/state.rb', line 63 def poll drained = true deadline = monotonic_now + @max_poll_time @client.events_poll_nb_each do |count| if count.zero? :stop elsif monotonic_now >= deadline drained = false :stop end end drained end |
#queue_empty? ⇒ Boolean
Checks if the producer’s event queue is empty
81 82 83 |
# File 'lib/waterdrop/polling/state.rb', line 81 def queue_empty? @client.queue_size.zero? end |
#signal_close ⇒ Object
Signals the poller to remove this producer Called from any thread when the producer is being closed Sets closing flag BEFORE signaling to ensure poller sees it
112 113 114 115 |
# File 'lib/waterdrop/polling/state.rb', line 112 def signal_close @closing = true @queue_pipe.signal end |
#signal_continue ⇒ Object
Signals that there’s more work to do (hit time limit but queue not empty) This wakes up IO.select immediately instead of waiting for timeout
119 120 121 |
# File 'lib/waterdrop/polling/state.rb', line 119 def signal_continue @queue_pipe.signal end |
#wait_for_close ⇒ Object
Waits for this state to be closed Used by unregister to ensure synchronous cleanup before returning This matches the threaded polling behavior which drains without timeout
139 140 141 |
# File 'lib/waterdrop/polling/state.rb', line 139 def wait_for_close @close_latch.wait end |