Class: Protobuf::Nats::ResponseMuxer
- Inherits:
-
Object
- Object
- Protobuf::Nats::ResponseMuxer
- Defined in:
- lib/protobuf/nats/response_muxer.rb
Constant Summary collapse
- LOCK =
::Mutex.new
- MAX_RESPONSES_PER_TOKEN =
10- TOKEN_TTL_SECONDS =
10 minutes
600
Instance Method Summary collapse
- #cleanup(token) ⇒ Object
-
#cleanup_stale_tokens ⇒ Object
Periodic cleanup of stale tokens.
-
#dispatcher_count ⇒ Object
Number of dispatcher threads draining the response subscription.
-
#initialize ⇒ ResponseMuxer
constructor
A new instance of ResponseMuxer.
- #logger ⇒ Object
-
#monotonic_now ⇒ Object
Monotonic clock for token TTL accounting.
- #new_request ⇒ Object
- #next_message(token, timeout) ⇒ Object
- #publish(subject, data, token) ⇒ Object
- #restart ⇒ Object
- #start ⇒ Object
- #started? ⇒ Boolean
-
#stop ⇒ Object
Stop the cleanup thread.
Constructor Details
#initialize ⇒ ResponseMuxer
Returns a new instance of ResponseMuxer.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/protobuf/nats/response_muxer.rb', line 17 def initialize # Per-token response queues for lock-free message delivery. @resp_map is a # Concurrent::Map so request threads and dispatcher threads can insert, # look up, and delete tokens without serializing on a single mutex (on # JRuby it is backed by java.util.concurrent.ConcurrentHashMap). Each # value is a Hash { queue:, created_at: }. @resp_map = ::Concurrent::Map.new @resp_handlers = [] @cleanup_thread = nil @shutdown = false @cleanup_mutex = ::Mutex.new @cleanup_cv = ::ConditionVariable.new @restarting = false # Flag to prevent concurrent restarts end |
Instance Method Details
#cleanup(token) ⇒ Object
58 59 60 61 62 |
# File 'lib/protobuf/nats/response_muxer.rb', line 58 def cleanup(token) # Atomic remove-and-return; close the queue to wake any waiting threads. entry = @resp_map.delete(token) entry[:queue]&.close if entry end |
#cleanup_stale_tokens ⇒ Object
Periodic cleanup of stale tokens
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/protobuf/nats/response_muxer.rb', line 225 def cleanup_stale_tokens cutoff = monotonic_now - TOKEN_TTL_SECONDS # Collect stale tokens first, then delete. Concurrent::Map iteration does # not hold a global lock, so request threads are never blocked across this # O(n) scan (unlike the previous single-mutex implementation). stale_tokens = [] @resp_map.each_pair do |token, data| created_at = data[:created_at] stale_tokens << token if created_at && created_at < cutoff end stale_count = 0 stale_tokens.each do |token| data = @resp_map.delete(token) next unless data stale_count += 1 logger.warn "Cleaning up stale token #{token} created at #{data[:created_at]}" # Close the queue to wake any waiting threads data[:queue]&.close end if stale_count > 0 ::ActiveSupport::Notifications.instrument "response_muxer.stale_tokens_cleaned.protobuf-nats", stale_count end end |
#dispatcher_count ⇒ Object
Number of dispatcher threads draining the response subscription. On JRuby (true parallelism) a single dispatcher is a hard throughput ceiling, so we fan out to processor_count; on CRuby the GVL makes extra dispatchers pointless, so we stay at 1. Overridable via env for tuning/tests.
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/protobuf/nats/response_muxer.rb', line 46 def dispatcher_count @dispatcher_count ||= begin if ::ENV.key?("PB_NATS_RESPONSE_MUXER_DISPATCHERS") [::ENV["PB_NATS_RESPONSE_MUXER_DISPATCHERS"].to_i, 1].max elsif ::RUBY_ENGINE == "jruby" [::Concurrent.processor_count, 1].max else 1 end end end |
#logger ⇒ Object
32 33 34 |
# File 'lib/protobuf/nats/response_muxer.rb', line 32 def logger ::Protobuf::Logging.logger end |
#monotonic_now ⇒ Object
Monotonic clock for token TTL accounting. Cheaper than Time.now (no Time object / timezone work per request) and immune to wall-clock jumps.
38 39 40 |
# File 'lib/protobuf/nats/response_muxer.rb', line 38 def monotonic_now ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) end |
#new_request ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/protobuf/nats/response_muxer.rb', line 107 def new_request # Use UUIDv7 so we can figure out what time a message was originally created in-memory. token = UUIDv7Helper.generate # nats.new_inbox with nuid is not threadsafe. # Create a dedicated queue for this token. Concurrent::Map#[]= is atomic, # so no surrounding lock is required. @resp_map[token] = { queue: ::Concurrent::Collection::TimeoutQueue.new, created_at: monotonic_now } ResponseMuxerRequest.new(self, token) end |
#next_message(token, timeout) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/protobuf/nats/response_muxer.rb', line 64 def (token, timeout) # Lock-free get of the per-token queue. entry = @resp_map[token] queue = entry && entry[:queue] unless queue logger.warn "Token #{token} not found or already cleaned up during next_message" raise ::NATS::Timeout end # Handle edge case: zero or negative timeout if timeout && timeout <= 0 raise ::NATS::Timeout end # Use TimeoutQueue's native timeout support for efficient, lock-free waiting per token # Each token has its own queue, eliminating contention between different requests begin # TimeoutQueue.pop(non_block, timeout: seconds) # - With timeout: blocks until message arrives or timeout expires (returns nil on timeout) # - Without timeout (nil): blocks indefinitely until message arrives msg = if timeout queue.pop(false, timeout: timeout) else queue.pop(false) end # Queue.pop returns nil when: # 1. The queue is closed # 2. The timeout expires unless msg logger.warn "Queue closed or timeout for token #{token} during next_message" raise ::NATS::Timeout end msg rescue ThreadError # Queue was closed - treat as timeout logger.warn "Queue closed for token #{token} during next_message" raise ::NATS::Timeout end end |
#publish(subject, data, token) ⇒ Object
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/protobuf/nats/response_muxer.rb', line 121 def publish(subject, data, token) # Validate muxer started before publish unless @resp_inbox_prefix raise ::Protobuf::Nats::Errors::ResponseMuxer, "ResponseMuxer not started - cannot publish" end nats = Protobuf::Nats.client_nats_connection reply_to = "#{@resp_inbox_prefix}.#{token}" nats.publish(subject, data, reply_to) end |
#restart ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/protobuf/nats/response_muxer.rb', line 132 def restart logger.debug "restarting response_muxer" # Prevent concurrent restarts - only one restart at a time LOCK.synchronize do if @restarting logger.warn "Restart already in progress, skipping concurrent restart request" return end @restarting = true end # Yield so other restart callers spawned around the same time get a # chance to reach the @restarting check above and skip. Without this, # CRuby's GVL can let the current thread run the entire restart to # completion (clearing @restarting) before sibling threads even enter # the method, defeating the concurrent-restart guard. Thread.pass begin # Stop the existing muxer first, if it's running LOCK.synchronize do @resp_handlers.each(&:kill) @resp_handlers.clear if @resp_sub begin @resp_sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe old response muxer subscription: #{e.}" ensure # Always set to nil, even if unsubscribe raises @resp_sub = nil end end # Stop the cleanup thread stop_cleanup_thread @started = false end # Then start it fresh. start ensure # Always clear the restarting flag LOCK.synchronize { @restarting = false } end end |
#start ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/protobuf/nats/response_muxer.rb', line 181 def start return if started? LOCK.synchronize do # We check this twice in case another thread was waiting for the lock to # start this party. Use the unlocked check to prevent deadlocks. return if _started? nats = ::Protobuf::Nats.client_nats_connection return if nats.nil? # Clean up partial state on exception begin @resp_inbox_prefix = nats.new_inbox # Subscribe to our per-instance inbox @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") @started = true rescue => e # Clean up partial state @resp_inbox_prefix = nil @resp_sub = nil @started = false logger.error "Failed to start ResponseMuxer: #{e.}" raise end end # Start the cleanup thread start_cleanup_thread # Top up the dispatcher pool to dispatcher_count. Prunes dead threads # first so self-healing restarts converge to the target count instead of # multiplying threads. LOCK.synchronize do @resp_handlers.select!(&:alive?) @resp_handlers << spawn_dispatcher while @resp_handlers.size < dispatcher_count end end |
#started? ⇒ Boolean
220 221 222 |
# File 'lib/protobuf/nats/response_muxer.rb', line 220 def started? LOCK.synchronize { _started? } end |
#stop ⇒ Object
Stop the cleanup thread
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/protobuf/nats/response_muxer.rb', line 253 def stop LOCK.synchronize do stop_cleanup_thread @resp_handlers.each(&:kill) @resp_handlers.clear if @resp_sub begin @resp_sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe during stop: #{e.}" ensure @resp_sub = nil end end @started = false end end |