Class: Protobuf::Nats::ResponseMuxer
- Inherits:
-
Object
- Object
- Protobuf::Nats::ResponseMuxer
- Defined in:
- lib/protobuf/nats/response_muxer.rb
Constant Summary collapse
- LOCK =
::Mutex.new
- MAX_RESPONSES_PER_TOKEN =
10- TOKEN_TTL_SECONDS =
10 minutes
600- DISABLED_PENDING_BYTES_LIMIT =
Slow-consumer protection on the response subscription is by message count (the SizedQueue depth, tracked accurately for free by nats-pure); the byte-based limit is disabled (see #disable_byte_limit!) so we don’t have to mirror nats-pure’s pending_size accounting on the dispatch hot path. INFINITY makes nats-pure’s ‘pending_size >= pending_bytes_limit` check always false.
::Float::INFINITY
Instance Method Summary collapse
- #cleanup(token) ⇒ Object
-
#cleanup_stale_tokens ⇒ Object
Periodic cleanup of stale tokens.
-
#dispatcher_count ⇒ Object
Number of dispatcher threads draining the response subscription.
-
#initialize ⇒ ResponseMuxer
constructor
A new instance of ResponseMuxer.
- #logger ⇒ Object
-
#monotonic_now ⇒ Object
Monotonic clock for token TTL accounting (single source of truth in Protobuf::Nats.monotonic_time).
- #new_request ⇒ Object
- #next_message(token, timeout) ⇒ Object
- #publish(subject, data, token) ⇒ Object
- #restart ⇒ Object
- #start ⇒ Object
- #started? ⇒ Boolean
-
#stop ⇒ Object
Stop the cleanup thread.
Constructor Details
#initialize ⇒ ResponseMuxer
Returns a new instance of ResponseMuxer.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/protobuf/nats/response_muxer.rb', line 17 def initialize # Per-token response queues for lock-free message delivery. @resp_map is a # Concurrent::Map so request threads and dispatcher threads can insert, # look up, and delete tokens without serializing on a single mutex (on # JRuby it is backed by java.util.concurrent.ConcurrentHashMap). Each # value is a Hash { queue:, created_at: }. @resp_map = ::Concurrent::Map.new @resp_handlers = [] @cleanup_thread = nil @shutdown = false @cleanup_mutex = ::Mutex.new @cleanup_cv = ::ConditionVariable.new @restarting = false # Flag to prevent concurrent restarts # Shared self-healing backoff counter for the dispatcher pool. Atomic so # concurrent dispatchers don't lose updates when several crash at once, # and it decays back to zero once a dispatcher is healthy again (see # run_dispatch_loop), so a later transient crash restarts the backoff # from 1s instead of staying pinned at the cap. @crash_count = ::Concurrent::AtomicFixnum.new(0) end |
Instance Method Details
#cleanup(token) ⇒ Object
73 74 75 76 77 |
# File 'lib/protobuf/nats/response_muxer.rb', line 73 def cleanup(token) # Atomic remove-and-return; close the queue to wake any waiting threads. entry = @resp_map.delete(token) entry[:queue]&.close if entry end |
#cleanup_stale_tokens ⇒ Object
Periodic cleanup of stale tokens
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/protobuf/nats/response_muxer.rb', line 241 def cleanup_stale_tokens cutoff = monotonic_now - TOKEN_TTL_SECONDS # Collect stale tokens first, then delete. Concurrent::Map iteration does # not hold a global lock, so request threads are never blocked across this # O(n) scan (unlike the previous single-mutex implementation). stale_tokens = [] @resp_map.each_pair do |token, data| created_at = data[:created_at] stale_tokens << token if created_at && created_at < cutoff end stale_count = 0 stale_tokens.each do |token| data = @resp_map.delete(token) next unless data stale_count += 1 logger.warn "Cleaning up stale token #{token} created at #{data[:created_at]}" # Close the queue to wake any waiting threads data[:queue]&.close end if stale_count > 0 ::Protobuf::Nats.instrument "response_muxer.stale_tokens_cleaned", stale_count end end |
#dispatcher_count ⇒ Object
Number of dispatcher threads draining the response subscription. On JRuby (true parallelism) a single dispatcher is a hard throughput ceiling, so we fan out to processor_count; on CRuby the GVL makes extra dispatchers pointless, so we stay at 1. Overridable via env for tuning/tests.
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/protobuf/nats/response_muxer.rb', line 61 def dispatcher_count @dispatcher_count ||= begin if ::ENV.key?("PB_NATS_RESPONSE_MUXER_DISPATCHERS") [::ENV["PB_NATS_RESPONSE_MUXER_DISPATCHERS"].to_i, 1].max elsif ::RUBY_ENGINE == "jruby" [::Concurrent.processor_count, 1].max else 1 end end end |
#logger ⇒ Object
47 48 49 |
# File 'lib/protobuf/nats/response_muxer.rb', line 47 def logger ::Protobuf::Logging.logger end |
#monotonic_now ⇒ Object
Monotonic clock for token TTL accounting (single source of truth in Protobuf::Nats.monotonic_time). Immune to wall-clock jumps.
53 54 55 |
# File 'lib/protobuf/nats/response_muxer.rb', line 53 def monotonic_now ::Protobuf::Nats.monotonic_time end |
#new_request ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/protobuf/nats/response_muxer.rb', line 122 def new_request # Use UUIDv7 so we can figure out what time a message was originally created in-memory. token = UUIDv7Helper.generate # nats.new_inbox with nuid is not threadsafe. # Create a dedicated queue for this token. Concurrent::Map#[]= is atomic, # so no surrounding lock is required. @resp_map[token] = { queue: ::Concurrent::Collection::TimeoutQueue.new, created_at: monotonic_now } ResponseMuxerRequest.new(self, token) end |
#next_message(token, timeout) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/protobuf/nats/response_muxer.rb', line 79 def (token, timeout) # Lock-free get of the per-token queue. entry = @resp_map[token] queue = entry && entry[:queue] unless queue logger.warn "Token #{token} not found or already cleaned up during next_message" raise ::NATS::Timeout end # Handle edge case: zero or negative timeout if timeout && timeout <= 0 raise ::NATS::Timeout end # Use TimeoutQueue's native timeout support for efficient, lock-free waiting per token # Each token has its own queue, eliminating contention between different requests begin # TimeoutQueue.pop(non_block, timeout: seconds) # - With timeout: blocks until message arrives or timeout expires (returns nil on timeout) # - Without timeout (nil): blocks indefinitely until message arrives msg = if timeout queue.pop(false, timeout: timeout) else queue.pop(false) end # Queue.pop returns nil when: # 1. The queue is closed # 2. The timeout expires unless msg logger.warn "Queue closed or timeout for token #{token} during next_message" raise ::NATS::Timeout end msg rescue ThreadError # Queue was closed - treat as timeout logger.warn "Queue closed for token #{token} during next_message" raise ::NATS::Timeout end end |
#publish(subject, data, token) ⇒ Object
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/protobuf/nats/response_muxer.rb', line 136 def publish(subject, data, token) # Validate muxer started before publish unless @resp_inbox_prefix raise ::Protobuf::Nats::Errors::ResponseMuxer, "ResponseMuxer not started - cannot publish" end nats = Protobuf::Nats.client_nats_connection reply_to = "#{@resp_inbox_prefix}.#{token}" nats.publish(subject, data, reply_to) end |
#restart ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/protobuf/nats/response_muxer.rb', line 147 def restart logger.debug "restarting response_muxer" # Prevent concurrent restarts - only one restart at a time LOCK.synchronize do if @restarting logger.warn "Restart already in progress, skipping concurrent restart request" return end @restarting = true end # Yield so other restart callers spawned around the same time get a # chance to reach the @restarting check above and skip. Without this, # CRuby's GVL can let the current thread run the entire restart to # completion (clearing @restarting) before sibling threads even enter # the method, defeating the concurrent-restart guard. Thread.pass begin # Stop the existing muxer first, if it's running LOCK.synchronize do @resp_handlers.each(&:kill) @resp_handlers.clear if @resp_sub begin @resp_sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe old response muxer subscription: #{e.}" ensure # Always set to nil, even if unsubscribe raises @resp_sub = nil end end # Stop the cleanup thread stop_cleanup_thread @started = false end # Then start it fresh. start ensure # Always clear the restarting flag LOCK.synchronize { @restarting = false } end end |
#start ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/protobuf/nats/response_muxer.rb', line 196 def start return if started? LOCK.synchronize do # We check this twice in case another thread was waiting for the lock to # start this party. Use the unlocked check to prevent deadlocks. return if _started? nats = ::Protobuf::Nats.client_nats_connection return if nats.nil? # Clean up partial state on exception begin @resp_inbox_prefix = nats.new_inbox # Subscribe to our per-instance inbox @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") disable_byte_limit!(@resp_sub) @started = true rescue => e # Clean up partial state @resp_inbox_prefix = nil @resp_sub = nil @started = false logger.error "Failed to start ResponseMuxer: #{e.}" raise end end # Start the cleanup thread start_cleanup_thread # Top up the dispatcher pool to dispatcher_count. Prunes dead threads # first so self-healing restarts converge to the target count instead of # multiplying threads. LOCK.synchronize do @resp_handlers.select!(&:alive?) @resp_handlers << spawn_dispatcher while @resp_handlers.size < dispatcher_count end end |
#started? ⇒ Boolean
236 237 238 |
# File 'lib/protobuf/nats/response_muxer.rb', line 236 def started? LOCK.synchronize { _started? } end |
#stop ⇒ Object
Stop the cleanup thread
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/protobuf/nats/response_muxer.rb', line 269 def stop LOCK.synchronize do stop_cleanup_thread @resp_handlers.each(&:kill) @resp_handlers.clear if @resp_sub begin @resp_sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe during stop: #{e.}" ensure @resp_sub = nil end end @started = false end end |