Class: Protobuf::Nats::SuperSubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/super_subscription_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(nats, &cb) ⇒ SuperSubscriptionManager

Returns a new instance of SuperSubscriptionManager.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 11

def initialize(nats, &cb)
  # Central queue used by all subscriptions
  @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT)
  @subscriptions = []
  @subscriptions_mutex = ::Mutex.new
  @nats = nats
  @callback = cb

  # Fan out the intake across several handler threads. A single thread is a
  # throughput ceiling on JRuby and lets one slow publish (ACK) inside the
  # callback head-of-line block every other subject. Each handler pops the
  # shared SizedQueue (thread-safe) independently.
  @pending_queue_handlers = handler_count.times.map { |i| spawn_handler(i) }

  ::Protobuf::Nats.instrument("server.subscription_handler_count", @pending_queue_handlers.size)
end

Instance Method Details

#handler_countObject

Number of intake handler threads. On JRuby (true parallelism) fan out to processor_count; on CRuby the GVL makes extra handlers pointless, so 1. Overridable via env for tuning/tests. Mirrors ResponseMuxer#dispatcher_count.



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 35

def handler_count
  @handler_count ||= begin
    if ::ENV.key?("PB_NATS_SERVER_SUBSCRIPTION_HANDLERS")
      [::ENV["PB_NATS_SERVER_SUBSCRIPTION_HANDLERS"].to_i, 1].max
    elsif ::RUBY_ENGINE == "jruby"
      [::Concurrent.processor_count, 1].max
    else
      1
    end
  end
end

#loggerObject



28
29
30
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 28

def logger
  ::Protobuf::Logging.logger
end

#pending_queue_sizeObject

Depth of the shared intake queue = intake backpressure (for observability).



130
131
132
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 130

def pending_queue_size
  @pending_queue.size
end

#queue_subscribe(name) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 47

def queue_subscribe(name)
  logger.debug { "queue_subscribe(#{name})" }
  sub = @nats.subscribe(name, :queue => name)

  # Create a subscription but reset the pending queue to use a central pending queue.
  existing_pending_queue = sub.pending_queue
  sub.pending_queue = @pending_queue

  # Push all race-conditioned messages onto the pending queue.
  # Should address a potential race condition. Chances of the round-trip message to an
  # existing queue before this queue swap happens seems extremely low, but possible.
  migrated_count = 0
  max_migrations = 10000  # Safety limit

  while !existing_pending_queue.empty? && migrated_count < max_migrations
    # Non-blocking pop: another consumer could in theory drain it, so don't block.
    begin
      msg = existing_pending_queue.pop(true)
    rescue ThreadError
      break
    end

    # Non-blocking push with timeout
    begin
      Timeout.timeout(1) do
        @pending_queue << msg
      end
      migrated_count += 1
      logger.warn "Migrated message #{migrated_count} from old queue to central queue"
    rescue Timeout::Error
      logger.error "Failed to migrate message to central queue (queue full), dropping message"
      break
    end
  end

  if migrated_count >= max_migrations
    logger.error "Hit migration limit! Old queue still has #{existing_pending_queue.size} messages"
  end

  @subscriptions_mutex.synchronize { @subscriptions << sub }

  sub
end

#shutdown(timeout = 5) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 91

def shutdown(timeout = 5)
  handlers = @pending_queue_handlers.select(&:alive?)
  return if handlers.empty?

  # Wake every handler with its own poison pill.
  handlers.size.times do
    begin
      # Clear some space if the queue is full so the shutdown signal fits.
      if @pending_queue.num_waiting.zero? && @pending_queue.size >= @pending_queue.max
        logger.warn "Queue full during shutdown, clearing to make room for shutdown signal"
        @pending_queue.clear rescue nil
      end

      Timeout.timeout(1) { @pending_queue << :shutdown }
    rescue Timeout::Error
      logger.error "Failed to send shutdown signal (queue blocked); will force-kill remaining handlers"
      break
    end
  end

  # Join all handlers within a single shared deadline, then force-kill stragglers.
  deadline = monotonic + timeout
  handlers.each do |handler|
    remaining = deadline - monotonic
    handler.join(remaining.positive? ? remaining : 0)
  end

  handlers.each do |handler|
    next unless handler.alive?
    logger.warn "Handler thread did not shut down in time, forcefully killing..."
    handler.kill
    handler.join(1) rescue nil
  end

  # Clean up queue
  @pending_queue.clear rescue nil
end

#unsubscribe_allObject



134
135
136
137
138
139
140
141
142
143
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 134

def unsubscribe_all
  subscriptions = @subscriptions_mutex.synchronize { @subscriptions.dup }
  subscriptions.each do |sub|
    begin
      sub.unsubscribe
    rescue => e
      logger.warn "Failed to unsubscribe #{sub.subject rescue 'unknown'}: #{e.message}"
    end
  end
end