Class: Protobuf::Nats::ResponseMuxer

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/response_muxer.rb

Constant Summary collapse

LOCK =
::Mutex.new
MAX_RESPONSES_PER_TOKEN =
10
TOKEN_TTL_SECONDS =

10 minutes

600

Instance Method Summary collapse

Constructor Details

#initializeResponseMuxer

Returns a new instance of ResponseMuxer.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/protobuf/nats/response_muxer.rb', line 17

def initialize
  # Per-token response queues for lock-free message delivery
  # Each token gets its own Queue, eliminating lock contention between different tokens
  @resp_map = Hash.new { |h,k| h[k] = { } }
  @resp_handlers = []
  @map_lock = ::Mutex.new  # Lightweight lock only for map structure changes
  @cleanup_thread = nil
  @shutdown = false
  @cleanup_mutex = ::Mutex.new
  @cleanup_cv = ::ConditionVariable.new
  @restarting = false  # Flag to prevent concurrent restarts
end

Instance Method Details

#cleanup(token) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/protobuf/nats/response_muxer.rb', line 34

def cleanup(token)
  @map_lock.synchronize do
    # Close the queue to wake any waiting threads
    queue = @resp_map.dig(token, :queue)
    queue&.close
    @resp_map.delete(token)
  end
end

#cleanup_stale_tokensObject

Periodic cleanup of stale tokens



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/protobuf/nats/response_muxer.rb', line 314

def cleanup_stale_tokens
  cutoff = Time.now - TOKEN_TTL_SECONDS

  @map_lock.synchronize do
    stale_count = 0
    @resp_map.delete_if do |token, data|
      if data[:created_at] && data[:created_at] < cutoff
        stale_count += 1
        logger.warn "Cleaning up stale token #{token} created at #{data[:created_at]}"
        # Close the queue to wake any waiting threads
        data[:queue]&.close
        true
      else
        false
      end
    end

    if stale_count > 0
      ::ActiveSupport::Notifications.instrument "response_muxer.stale_tokens_cleaned.protobuf-nats", stale_count
    end
  end
end

#loggerObject



30
31
32
# File 'lib/protobuf/nats/response_muxer.rb', line 30

def logger
  ::Protobuf::Logging.logger
end

#new_requestObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/protobuf/nats/response_muxer.rb', line 89

def new_request
  # Use UUIDv7 so we can figure out what time a message was originally created in-memory.
  token = new_uuidv7 # nats.new_inbox with nuid is not threadsafe.

  @map_lock.synchronize do
    # Create a dedicated queue for this token
    # TimeoutQueue provides native timeout support for efficient blocking
    @resp_map[token] = {
      queue: Concurrent::Collection::TimeoutQueue.new,
      created_at: Time.now
    }
  end

  ResponseMuxerRequest.new(self, token)
end

#new_uuidv7Object



85
86
87
# File 'lib/protobuf/nats/response_muxer.rb', line 85

def new_uuidv7
  UUID7.generate
end

#next_message(token, timeout) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/protobuf/nats/response_muxer.rb', line 43

def next_message(token, timeout)
  # Get the queue for this token with minimal locking
  queue = @map_lock.synchronize { @resp_map.dig(token, :queue) }

  unless queue
    logger.warn "Token #{token} not found or already cleaned up during next_message"
    raise ::NATS::Timeout
  end

  # Handle edge case: zero or negative timeout
  if timeout && timeout <= 0
    raise ::NATS::Timeout
  end

  # Use TimeoutQueue's native timeout support for efficient, lock-free waiting per token
  # Each token has its own queue, eliminating contention between different requests
  begin
    # TimeoutQueue.pop(non_block, timeout: seconds)
    # - With timeout: blocks until message arrives or timeout expires (returns nil on timeout)
    # - Without timeout (nil): blocks indefinitely until message arrives
    msg = if timeout
            queue.pop(false, timeout: timeout)
          else
            queue.pop(false)
          end

    # Queue.pop returns nil when:
    # 1. The queue is closed
    # 2. The timeout expires
    unless msg
      logger.warn "Queue closed or timeout for token #{token} during next_message"
      raise ::NATS::Timeout
    end

    msg
  rescue ThreadError
    # Queue was closed - treat as timeout
    logger.warn "Queue closed for token #{token} during next_message"
    raise ::NATS::Timeout
  end
end

#publish(subject, data, token) ⇒ Object



105
106
107
108
109
110
111
112
113
114
# File 'lib/protobuf/nats/response_muxer.rb', line 105

def publish(subject, data, token)
  # Validate muxer started before publish
  unless @resp_inbox_prefix
    raise ::Protobuf::Nats::Errors::ResponseMuxer, "ResponseMuxer not started - cannot publish"
  end

  nats = Protobuf::Nats.client_nats_connection
  reply_to = "#{@resp_inbox_prefix}.#{token}"
  nats.publish(subject, data, reply_to)
end

#restartObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/protobuf/nats/response_muxer.rb', line 116

def restart
  logger.debug "restarting response_muxer"

  # Prevent concurrent restarts - only one restart at a time
  LOCK.synchronize do
    if @restarting
      logger.warn "Restart already in progress, skipping concurrent restart request"
      return
    end
    @restarting = true
  end

  # Yield so other restart callers spawned around the same time get a
  # chance to reach the @restarting check above and skip. Without this,
  # CRuby's GVL can let the current thread run the entire restart to
  # completion (clearing @restarting) before sibling threads even enter
  # the method, defeating the concurrent-restart guard.
  Thread.pass

  begin
    # Stop the existing muxer first, if it's running
    LOCK.synchronize do
      @resp_handlers.each(&:kill)
      @resp_handlers.clear
      if @resp_sub
        begin
          @resp_sub.unsubscribe
        rescue => e
          logger.warn "Failed to unsubscribe old response muxer subscription: #{e.message}"
        ensure
          # Always set to nil, even if unsubscribe raises
          @resp_sub = nil
        end
      end

      # Stop the cleanup thread
      stop_cleanup_thread

      @started = false
    end

    # Then start it fresh.
    start
  ensure
    # Always clear the restarting flag
    LOCK.synchronize { @restarting = false }
  end
end

#startObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/protobuf/nats/response_muxer.rb', line 165

def start
  return if started?
  LOCK.synchronize do
    # We check this twice in case another thread was waiting for the lock to
    # start this party. Use the unlocked check to prevent deadlocks.
    return if _started?

    nats = ::Protobuf::Nats.client_nats_connection
    return if nats.nil?

    # Clean up partial state on exception
    begin
      @resp_inbox_prefix = nats.new_inbox

      # Subscribe to our per-instance inbox
      @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*")
      @started = true
    rescue => e
      # Clean up partial state
      @resp_inbox_prefix = nil
      @resp_sub = nil
      @started = false
      logger.error "Failed to start ResponseMuxer: #{e.message}"
      raise
    end
  end

  # Start the cleanup thread
  start_cleanup_thread

  LOCK.synchronize do
    @resp_handlers.select!(&:alive?)
    @resp_handlers << Thread.new do
      # Unique thread name for debugging
      Thread.current.name = "response-muxer-#{Thread.current.object_id}"
      begin
        # Reset crash count on successful start
        @crash_count = 0

        loop do
          begin
            # --- Start of per-message block ---
            msg = @resp_sub.pending_queue.pop

            # ACK means the message has been picked up and put into the waiting thread_pool
            next if msg.nil?

            # Decrease pending size since consumed already
            # NOTE: This is outside the lock since it's just updating metrics
            @resp_sub.pending_size -= msg.data.size if @resp_sub

            # Validate message subject before processing
            unless msg.subject.is_a?(String) && msg.subject.include?('.')
              ::ActiveSupport::Notifications.instrument "client.invalid_message.protobuf-nats", 1

              logger.warn "Received message with invalid subject: #{msg.subject}. Dropping."
              next
            end

            # example(random data):
            # _INBOX.{random_data}.{random_data_msg_id}
            token = msg.subject.split('.').last

            logger.debug { "token: #{token}, resp_map.keys:#{@resp_map.keys}" } if logger.debug?

            # Get the queue for this token with minimal locking
            queue = @map_lock.synchronize do
              unless @resp_map.key?(token)
                # Try to decode the UUIDv7 timestamp to calculate message age
                delay_seconds = UUIDv7Helper.age_in_seconds(token)

                ::ActiveSupport::Notifications.instrument "client.unexpected_message.protobuf-nats", delay_seconds || 1

                if delay_seconds
                  logger.warn "Received unexpected message (#{delay_seconds.round(3)}s old). MSG.subject=#{msg.subject}. RESP_SUBJ.subject=#{@resp_sub.subject rescue 'unknown'}. Dropping unexpected message."
                else
                  logger.warn "Received unexpected message. MSG.subject=#{msg.subject}. RESP_SUBJ.subject=#{@resp_sub.subject rescue 'unknown'}. Dropping unexpected message."
                end
                nil
              else
                @resp_map[token][:queue]
              end
            end

            # Skip if token wasn't found
            next unless queue

            # Push message onto the queue - this is lock-free and thread-safe
            # The Queue implementation handles all synchronization internally
            begin
              # Check queue size to prevent memory bloat
              if queue.size >= MAX_RESPONSES_PER_TOKEN
                logger.warn "Token #{token} has #{queue.size} queued responses. Possible duplicate messages or slow consumer. Dropping message."
                next
              end

              queue.push(msg)
            rescue ThreadError => e
              # Queue was closed (cleanup happened) - this is fine, just drop the message
              logger.debug "Queue closed for token #{token}, dropping message"
            end

            # --- End of per-message block ---
          rescue => per_message_error
            # ThreadError is fatal, it means the queue is closed and the loop cannot continue.
            raise if per_message_error.is_a?(::ThreadError)

            # Log the error for the specific message, but DON'T kill the thread.
            logger.error("ResponseMuxer failed to process a message. Error: #{per_message_error.message}")
            ::Protobuf::Nats.notify_error_callbacks(per_message_error)
          end
        end
      rescue => fatal_error
        # This block is now only for truly fatal errors that kill the loop itself.
        logger.error("ResponseMuxer thread crashed fatally. Error: #{fatal_error.message}")
        ::Protobuf::Nats.notify_error_callbacks(fatal_error)

        # --- Self-healing logic ---
        @crash_count = (@crash_count || 0) + 1
        # Exponential backoff, e.g., 1, 4, 9, 16s... capped at 60s.
        sleep_duration = [(@crash_count**2), 60].min
        logger.warn("Waiting #{sleep_duration}s before attempting to restart ResponseMuxer.")
        sleep sleep_duration
        # --- End of self-healing logic ---

        # After sleeping, reset the state and try to start again.
        LOCK.synchronize do
          if @resp_sub
            begin
              @resp_sub.unsubscribe
            rescue => e
              logger.warn "Failed to unsubscribe old response muxer subscription during self-healing: #{e.message}"
            ensure
              @resp_sub = nil
            end
          end
          @started = false
        end
        start
      end
    end
  end
end

#started?Boolean

Returns:

  • (Boolean)


309
310
311
# File 'lib/protobuf/nats/response_muxer.rb', line 309

def started?
  LOCK.synchronize { _started? }
end

#stopObject

Stop the cleanup thread



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/protobuf/nats/response_muxer.rb', line 338

def stop
  LOCK.synchronize do
    stop_cleanup_thread
    @resp_handlers.each(&:kill)
    @resp_handlers.clear
    if @resp_sub
      begin
        @resp_sub.unsubscribe
      rescue => e
        logger.warn "Failed to unsubscribe during stop: #{e.message}"
      ensure
        @resp_sub = nil
      end
    end
    @started = false
  end
end