Class: Protobuf::Nats::ResponseMuxer

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/response_muxer.rb

Constant Summary collapse

LOCK =
::Mutex.new
MAX_RESPONSES_PER_TOKEN =
10
TOKEN_TTL_SECONDS =

10 minutes

600
DISABLED_PENDING_BYTES_LIMIT =

Slow-consumer protection on the response subscription is by message count (the SizedQueue depth, tracked accurately for free by nats-pure); the byte-based limit is disabled (see #disable_byte_limit!) so we don’t have to mirror nats-pure’s pending_size accounting on the dispatch hot path. INFINITY makes nats-pure’s ‘pending_size >= pending_bytes_limit` check always false.

::Float::INFINITY

Instance Method Summary collapse

Constructor Details

#initializeResponseMuxer

Returns a new instance of ResponseMuxer.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/protobuf/nats/response_muxer.rb', line 17

def initialize
  # Per-token response queues for lock-free message delivery. @resp_map is a
  # Concurrent::Map so request threads and dispatcher threads can insert,
  # look up, and delete tokens without serializing on a single mutex (on
  # JRuby it is backed by java.util.concurrent.ConcurrentHashMap). Each
  # value is a Hash { queue:, created_at: }.
  @resp_map = ::Concurrent::Map.new
  @resp_handlers = []
  @cleanup_thread = nil
  @shutdown = false
  @cleanup_mutex = ::Mutex.new
  @cleanup_cv = ::ConditionVariable.new
  @restarting = false  # Flag to prevent concurrent restarts

  # Shared self-healing backoff counter for the dispatcher pool. Atomic so
  # concurrent dispatchers don't lose updates when several crash at once,
  # and it decays back to zero once a dispatcher is healthy again (see
  # run_dispatch_loop), so a later transient crash restarts the backoff
  # from 1s instead of staying pinned at the cap.
  @crash_count = ::Concurrent::AtomicFixnum.new(0)
end

Instance Method Details

#cleanup(token) ⇒ Object



73
74
75
76
77
# File 'lib/protobuf/nats/response_muxer.rb', line 73

def cleanup(token)
  # Atomic remove-and-return; close the queue to wake any waiting threads.
  entry = @resp_map.delete(token)
  entry[:queue]&.close if entry
end

#cleanup_stale_tokensObject

Periodic cleanup of stale tokens



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/protobuf/nats/response_muxer.rb', line 241

def cleanup_stale_tokens
  cutoff = monotonic_now - TOKEN_TTL_SECONDS

  # Collect stale tokens first, then delete. Concurrent::Map iteration does
  # not hold a global lock, so request threads are never blocked across this
  # O(n) scan (unlike the previous single-mutex implementation).
  stale_tokens = []
  @resp_map.each_pair do |token, data|
    created_at = data[:created_at]
    stale_tokens << token if created_at && created_at < cutoff
  end

  stale_count = 0
  stale_tokens.each do |token|
    data = @resp_map.delete(token)
    next unless data
    stale_count += 1
    logger.warn "Cleaning up stale token #{token} created at #{data[:created_at]}"
    # Close the queue to wake any waiting threads
    data[:queue]&.close
  end

  if stale_count > 0
    ::Protobuf::Nats.instrument "response_muxer.stale_tokens_cleaned", stale_count
  end
end

#dispatcher_countObject

Number of dispatcher threads draining the response subscription. On JRuby (true parallelism) a single dispatcher is a hard throughput ceiling, so we fan out to processor_count; on CRuby the GVL makes extra dispatchers pointless, so we stay at 1. Overridable via env for tuning/tests.



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/protobuf/nats/response_muxer.rb', line 61

def dispatcher_count
  @dispatcher_count ||= begin
    if ::ENV.key?("PB_NATS_RESPONSE_MUXER_DISPATCHERS")
      [::ENV["PB_NATS_RESPONSE_MUXER_DISPATCHERS"].to_i, 1].max
    elsif ::RUBY_ENGINE == "jruby"
      [::Concurrent.processor_count, 1].max
    else
      1
    end
  end
end

#loggerObject



47
48
49
# File 'lib/protobuf/nats/response_muxer.rb', line 47

def logger
  ::Protobuf::Logging.logger
end

#monotonic_nowObject

Monotonic clock for token TTL accounting (single source of truth in Protobuf::Nats.monotonic_time). Immune to wall-clock jumps.



53
54
55
# File 'lib/protobuf/nats/response_muxer.rb', line 53

def monotonic_now
  ::Protobuf::Nats.monotonic_time
end

#new_requestObject



122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/protobuf/nats/response_muxer.rb', line 122

def new_request
  # Use UUIDv7 so we can figure out what time a message was originally created in-memory.
  token = UUIDv7Helper.generate # nats.new_inbox with nuid is not threadsafe.

  # Create a dedicated queue for this token. Concurrent::Map#[]= is atomic,
  # so no surrounding lock is required.
  @resp_map[token] = {
    queue: ::Concurrent::Collection::TimeoutQueue.new,
    created_at: monotonic_now
  }

  ResponseMuxerRequest.new(self, token)
end

#next_message(token, timeout) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/protobuf/nats/response_muxer.rb', line 79

def next_message(token, timeout)
  # Lock-free get of the per-token queue.
  entry = @resp_map[token]
  queue = entry && entry[:queue]

  unless queue
    logger.warn "Token #{token} not found or already cleaned up during next_message"
    raise ::NATS::Timeout
  end

  # Handle edge case: zero or negative timeout
  if timeout && timeout <= 0
    raise ::NATS::Timeout
  end

  # Use TimeoutQueue's native timeout support for efficient, lock-free waiting per token
  # Each token has its own queue, eliminating contention between different requests
  begin
    # TimeoutQueue.pop(non_block, timeout: seconds)
    # - With timeout: blocks until message arrives or timeout expires (returns nil on timeout)
    # - Without timeout (nil): blocks indefinitely until message arrives
    msg = if timeout
            queue.pop(false, timeout: timeout)
          else
            queue.pop(false)
          end

    # Queue.pop returns nil when:
    # 1. The queue is closed
    # 2. The timeout expires
    unless msg
      logger.warn "Queue closed or timeout for token #{token} during next_message"
      raise ::NATS::Timeout
    end

    msg
  rescue ThreadError
    # Queue was closed - treat as timeout
    logger.warn "Queue closed for token #{token} during next_message"
    raise ::NATS::Timeout
  end
end

#publish(subject, data, token) ⇒ Object



136
137
138
139
140
141
142
143
144
145
# File 'lib/protobuf/nats/response_muxer.rb', line 136

def publish(subject, data, token)
  # Validate muxer started before publish
  unless @resp_inbox_prefix
    raise ::Protobuf::Nats::Errors::ResponseMuxer, "ResponseMuxer not started - cannot publish"
  end

  nats = Protobuf::Nats.client_nats_connection
  reply_to = "#{@resp_inbox_prefix}.#{token}"
  nats.publish(subject, data, reply_to)
end

#restartObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/protobuf/nats/response_muxer.rb', line 147

def restart
  logger.debug "restarting response_muxer"

  # Prevent concurrent restarts - only one restart at a time
  LOCK.synchronize do
    if @restarting
      logger.warn "Restart already in progress, skipping concurrent restart request"
      return
    end
    @restarting = true
  end

  # Yield so other restart callers spawned around the same time get a
  # chance to reach the @restarting check above and skip. Without this,
  # CRuby's GVL can let the current thread run the entire restart to
  # completion (clearing @restarting) before sibling threads even enter
  # the method, defeating the concurrent-restart guard.
  Thread.pass

  begin
    # Stop the existing muxer first, if it's running
    LOCK.synchronize do
      @resp_handlers.each(&:kill)
      @resp_handlers.clear
      if @resp_sub
        begin
          @resp_sub.unsubscribe
        rescue => e
          logger.warn "Failed to unsubscribe old response muxer subscription: #{e.message}"
        ensure
          # Always set to nil, even if unsubscribe raises
          @resp_sub = nil
        end
      end

      # Stop the cleanup thread
      stop_cleanup_thread

      @started = false
    end

    # Then start it fresh.
    start
  ensure
    # Always clear the restarting flag
    LOCK.synchronize { @restarting = false }
  end
end

#startObject



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/protobuf/nats/response_muxer.rb', line 196

def start
  return if started?
  LOCK.synchronize do
    # We check this twice in case another thread was waiting for the lock to
    # start this party. Use the unlocked check to prevent deadlocks.
    return if _started?

    nats = ::Protobuf::Nats.client_nats_connection
    return if nats.nil?

    # Clean up partial state on exception
    begin
      @resp_inbox_prefix = nats.new_inbox

      # Subscribe to our per-instance inbox
      @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*")
      disable_byte_limit!(@resp_sub)
      @started = true
    rescue => e
      # Clean up partial state
      @resp_inbox_prefix = nil
      @resp_sub = nil
      @started = false
      logger.error "Failed to start ResponseMuxer: #{e.message}"
      raise
    end
  end

  # Start the cleanup thread
  start_cleanup_thread

  # Top up the dispatcher pool to dispatcher_count. Prunes dead threads
  # first so self-healing restarts converge to the target count instead of
  # multiplying threads.
  LOCK.synchronize do
    @resp_handlers.select!(&:alive?)
    @resp_handlers << spawn_dispatcher while @resp_handlers.size < dispatcher_count
  end
end

#started?Boolean

Returns:

  • (Boolean)


236
237
238
# File 'lib/protobuf/nats/response_muxer.rb', line 236

def started?
  LOCK.synchronize { _started? }
end

#stopObject

Stop the cleanup thread



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/protobuf/nats/response_muxer.rb', line 269

def stop
  LOCK.synchronize do
    stop_cleanup_thread
    @resp_handlers.each(&:kill)
    @resp_handlers.clear
    if @resp_sub
      begin
        @resp_sub.unsubscribe
      rescue => e
        logger.warn "Failed to unsubscribe during stop: #{e.message}"
      ensure
        @resp_sub = nil
      end
    end
    @started = false
  end
end