Class: Protobuf::Nats::ResponseMuxer

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/response_muxer.rb

Constant Summary collapse

LOCK =
::Mutex.new
MAX_RESPONSES_PER_TOKEN =
10
TOKEN_TTL_SECONDS =

10 minutes

600

Instance Method Summary collapse

Constructor Details

#initializeResponseMuxer

Returns a new instance of ResponseMuxer.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/protobuf/nats/response_muxer.rb', line 17

def initialize
  # Per-token response queues for lock-free message delivery. @resp_map is a
  # Concurrent::Map so request threads and dispatcher threads can insert,
  # look up, and delete tokens without serializing on a single mutex (on
  # JRuby it is backed by java.util.concurrent.ConcurrentHashMap). Each
  # value is a Hash { queue:, created_at: }.
  @resp_map = ::Concurrent::Map.new
  @resp_handlers = []
  @cleanup_thread = nil
  @shutdown = false
  @cleanup_mutex = ::Mutex.new
  @cleanup_cv = ::ConditionVariable.new
  @restarting = false  # Flag to prevent concurrent restarts
end

Instance Method Details

#cleanup(token) ⇒ Object



58
59
60
61
62
# File 'lib/protobuf/nats/response_muxer.rb', line 58

def cleanup(token)
  # Atomic remove-and-return; close the queue to wake any waiting threads.
  entry = @resp_map.delete(token)
  entry[:queue]&.close if entry
end

#cleanup_stale_tokensObject

Periodic cleanup of stale tokens



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/protobuf/nats/response_muxer.rb', line 225

def cleanup_stale_tokens
  cutoff = monotonic_now - TOKEN_TTL_SECONDS

  # Collect stale tokens first, then delete. Concurrent::Map iteration does
  # not hold a global lock, so request threads are never blocked across this
  # O(n) scan (unlike the previous single-mutex implementation).
  stale_tokens = []
  @resp_map.each_pair do |token, data|
    created_at = data[:created_at]
    stale_tokens << token if created_at && created_at < cutoff
  end

  stale_count = 0
  stale_tokens.each do |token|
    data = @resp_map.delete(token)
    next unless data
    stale_count += 1
    logger.warn "Cleaning up stale token #{token} created at #{data[:created_at]}"
    # Close the queue to wake any waiting threads
    data[:queue]&.close
  end

  if stale_count > 0
    ::ActiveSupport::Notifications.instrument "response_muxer.stale_tokens_cleaned.protobuf-nats", stale_count
  end
end

#dispatcher_countObject

Number of dispatcher threads draining the response subscription. On JRuby (true parallelism) a single dispatcher is a hard throughput ceiling, so we fan out to processor_count; on CRuby the GVL makes extra dispatchers pointless, so we stay at 1. Overridable via env for tuning/tests.



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/protobuf/nats/response_muxer.rb', line 46

def dispatcher_count
  @dispatcher_count ||= begin
    if ::ENV.key?("PB_NATS_RESPONSE_MUXER_DISPATCHERS")
      [::ENV["PB_NATS_RESPONSE_MUXER_DISPATCHERS"].to_i, 1].max
    elsif ::RUBY_ENGINE == "jruby"
      [::Concurrent.processor_count, 1].max
    else
      1
    end
  end
end

#loggerObject



32
33
34
# File 'lib/protobuf/nats/response_muxer.rb', line 32

def logger
  ::Protobuf::Logging.logger
end

#monotonic_nowObject

Monotonic clock for token TTL accounting. Cheaper than Time.now (no Time object / timezone work per request) and immune to wall-clock jumps.



38
39
40
# File 'lib/protobuf/nats/response_muxer.rb', line 38

def monotonic_now
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end

#new_requestObject



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/protobuf/nats/response_muxer.rb', line 107

def new_request
  # Use UUIDv7 so we can figure out what time a message was originally created in-memory.
  token = UUIDv7Helper.generate # nats.new_inbox with nuid is not threadsafe.

  # Create a dedicated queue for this token. Concurrent::Map#[]= is atomic,
  # so no surrounding lock is required.
  @resp_map[token] = {
    queue: ::Concurrent::Collection::TimeoutQueue.new,
    created_at: monotonic_now
  }

  ResponseMuxerRequest.new(self, token)
end

#next_message(token, timeout) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/protobuf/nats/response_muxer.rb', line 64

def next_message(token, timeout)
  # Lock-free get of the per-token queue.
  entry = @resp_map[token]
  queue = entry && entry[:queue]

  unless queue
    logger.warn "Token #{token} not found or already cleaned up during next_message"
    raise ::NATS::Timeout
  end

  # Handle edge case: zero or negative timeout
  if timeout && timeout <= 0
    raise ::NATS::Timeout
  end

  # Use TimeoutQueue's native timeout support for efficient, lock-free waiting per token
  # Each token has its own queue, eliminating contention between different requests
  begin
    # TimeoutQueue.pop(non_block, timeout: seconds)
    # - With timeout: blocks until message arrives or timeout expires (returns nil on timeout)
    # - Without timeout (nil): blocks indefinitely until message arrives
    msg = if timeout
            queue.pop(false, timeout: timeout)
          else
            queue.pop(false)
          end

    # Queue.pop returns nil when:
    # 1. The queue is closed
    # 2. The timeout expires
    unless msg
      logger.warn "Queue closed or timeout for token #{token} during next_message"
      raise ::NATS::Timeout
    end

    msg
  rescue ThreadError
    # Queue was closed - treat as timeout
    logger.warn "Queue closed for token #{token} during next_message"
    raise ::NATS::Timeout
  end
end

#publish(subject, data, token) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/protobuf/nats/response_muxer.rb', line 121

def publish(subject, data, token)
  # Validate muxer started before publish
  unless @resp_inbox_prefix
    raise ::Protobuf::Nats::Errors::ResponseMuxer, "ResponseMuxer not started - cannot publish"
  end

  nats = Protobuf::Nats.client_nats_connection
  reply_to = "#{@resp_inbox_prefix}.#{token}"
  nats.publish(subject, data, reply_to)
end

#restartObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/protobuf/nats/response_muxer.rb', line 132

def restart
  logger.debug "restarting response_muxer"

  # Prevent concurrent restarts - only one restart at a time
  LOCK.synchronize do
    if @restarting
      logger.warn "Restart already in progress, skipping concurrent restart request"
      return
    end
    @restarting = true
  end

  # Yield so other restart callers spawned around the same time get a
  # chance to reach the @restarting check above and skip. Without this,
  # CRuby's GVL can let the current thread run the entire restart to
  # completion (clearing @restarting) before sibling threads even enter
  # the method, defeating the concurrent-restart guard.
  Thread.pass

  begin
    # Stop the existing muxer first, if it's running
    LOCK.synchronize do
      @resp_handlers.each(&:kill)
      @resp_handlers.clear
      if @resp_sub
        begin
          @resp_sub.unsubscribe
        rescue => e
          logger.warn "Failed to unsubscribe old response muxer subscription: #{e.message}"
        ensure
          # Always set to nil, even if unsubscribe raises
          @resp_sub = nil
        end
      end

      # Stop the cleanup thread
      stop_cleanup_thread

      @started = false
    end

    # Then start it fresh.
    start
  ensure
    # Always clear the restarting flag
    LOCK.synchronize { @restarting = false }
  end
end

#startObject



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/protobuf/nats/response_muxer.rb', line 181

def start
  return if started?
  LOCK.synchronize do
    # We check this twice in case another thread was waiting for the lock to
    # start this party. Use the unlocked check to prevent deadlocks.
    return if _started?

    nats = ::Protobuf::Nats.client_nats_connection
    return if nats.nil?

    # Clean up partial state on exception
    begin
      @resp_inbox_prefix = nats.new_inbox

      # Subscribe to our per-instance inbox
      @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*")
      @started = true
    rescue => e
      # Clean up partial state
      @resp_inbox_prefix = nil
      @resp_sub = nil
      @started = false
      logger.error "Failed to start ResponseMuxer: #{e.message}"
      raise
    end
  end

  # Start the cleanup thread
  start_cleanup_thread

  # Top up the dispatcher pool to dispatcher_count. Prunes dead threads
  # first so self-healing restarts converge to the target count instead of
  # multiplying threads.
  LOCK.synchronize do
    @resp_handlers.select!(&:alive?)
    @resp_handlers << spawn_dispatcher while @resp_handlers.size < dispatcher_count
  end
end

#started?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/protobuf/nats/response_muxer.rb', line 220

def started?
  LOCK.synchronize { _started? }
end

#stopObject

Stop the cleanup thread



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/protobuf/nats/response_muxer.rb', line 253

def stop
  LOCK.synchronize do
    stop_cleanup_thread
    @resp_handlers.each(&:kill)
    @resp_handlers.clear
    if @resp_sub
      begin
        @resp_sub.unsubscribe
      rescue => e
        logger.warn "Failed to unsubscribe during stop: #{e.message}"
      ensure
        @resp_sub = nil
      end
    end
    @started = false
  end
end