Class: Protobuf::Nats::SuperSubscriptionManager
- Inherits:
-
Object
- Object
- Protobuf::Nats::SuperSubscriptionManager
- Defined in:
- lib/protobuf/nats/super_subscription_manager.rb
Instance Method Summary collapse
-
#handler_count ⇒ Object
Number of intake handler threads.
-
#initialize(nats, &cb) ⇒ SuperSubscriptionManager
constructor
A new instance of SuperSubscriptionManager.
- #logger ⇒ Object
-
#pending_queue_size ⇒ Object
Depth of the shared intake queue = intake backpressure (for observability).
- #queue_subscribe(name) ⇒ Object
- #shutdown(timeout = 5) ⇒ Object
- #unsubscribe_all ⇒ Object
Constructor Details
#initialize(nats, &cb) ⇒ SuperSubscriptionManager
Returns a new instance of SuperSubscriptionManager.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 11 def initialize(nats, &cb) # Central queue used by all subscriptions @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT) @subscriptions = [] @subscriptions_mutex = ::Mutex.new @nats = nats @callback = cb # Fan out the intake across several handler threads. A single thread is a # throughput ceiling on JRuby and lets one slow publish (ACK) inside the # callback head-of-line block every other subject. Each handler pops the # shared SizedQueue (thread-safe) independently. @pending_queue_handlers = handler_count.times.map { |i| spawn_handler(i) } ::Protobuf::Nats.instrument("server.subscription_handler_count", @pending_queue_handlers.size) end |
Instance Method Details
#handler_count ⇒ Object
Number of intake handler threads. On JRuby (true parallelism) fan out to processor_count; on CRuby the GVL makes extra handlers pointless, so 1. Overridable via env for tuning/tests. Mirrors ResponseMuxer#dispatcher_count.
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 35 def handler_count @handler_count ||= begin if ::ENV.key?("PB_NATS_SERVER_SUBSCRIPTION_HANDLERS") [::ENV["PB_NATS_SERVER_SUBSCRIPTION_HANDLERS"].to_i, 1].max elsif ::RUBY_ENGINE == "jruby" [::Concurrent.processor_count, 1].max else 1 end end end |
#logger ⇒ Object
28 29 30 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 28 def logger ::Protobuf::Logging.logger end |
#pending_queue_size ⇒ Object
Depth of the shared intake queue = intake backpressure (for observability).
130 131 132 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 130 def pending_queue_size @pending_queue.size end |
#queue_subscribe(name) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 47 def queue_subscribe(name) logger.debug { "queue_subscribe(#{name})" } sub = @nats.subscribe(name, :queue => name) # Create a subscription but reset the pending queue to use a central pending queue. existing_pending_queue = sub.pending_queue sub.pending_queue = @pending_queue # Push all race-conditioned messages onto the pending queue. # Should address a potential race condition. Chances of the round-trip message to an # existing queue before this queue swap happens seems extremely low, but possible. migrated_count = 0 max_migrations = 10000 # Safety limit while !existing_pending_queue.empty? && migrated_count < max_migrations # Non-blocking pop: another consumer could in theory drain it, so don't block. begin msg = existing_pending_queue.pop(true) rescue ThreadError break end # Non-blocking push with timeout begin Timeout.timeout(1) do @pending_queue << msg end migrated_count += 1 logger.warn "Migrated message #{migrated_count} from old queue to central queue" rescue Timeout::Error logger.error "Failed to migrate message to central queue (queue full), dropping message" break end end if migrated_count >= max_migrations logger.error "Hit migration limit! Old queue still has #{existing_pending_queue.size} messages" end @subscriptions_mutex.synchronize { @subscriptions << sub } sub end |
#shutdown(timeout = 5) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 91 def shutdown(timeout = 5) handlers = @pending_queue_handlers.select(&:alive?) return if handlers.empty? # Wake every handler with its own poison pill. handlers.size.times do begin # Clear some space if the queue is full so the shutdown signal fits. if @pending_queue.num_waiting.zero? && @pending_queue.size >= @pending_queue.max logger.warn "Queue full during shutdown, clearing to make room for shutdown signal" @pending_queue.clear rescue nil end Timeout.timeout(1) { @pending_queue << :shutdown } rescue Timeout::Error logger.error "Failed to send shutdown signal (queue blocked); will force-kill remaining handlers" break end end # Join all handlers within a single shared deadline, then force-kill stragglers. deadline = monotonic + timeout handlers.each do |handler| remaining = deadline - monotonic handler.join(remaining.positive? ? remaining : 0) end handlers.each do |handler| next unless handler.alive? logger.warn "Handler thread did not shut down in time, forcefully killing..." handler.kill handler.join(1) rescue nil end # Clean up queue @pending_queue.clear rescue nil end |
#unsubscribe_all ⇒ Object
134 135 136 137 138 139 140 141 142 143 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 134 def unsubscribe_all subscriptions = @subscriptions_mutex.synchronize { @subscriptions.dup } subscriptions.each do |sub| begin sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe #{sub.subject rescue 'unknown'}: #{e.}" end end end |