Class: Protobuf::Nats::SuperSubscriptionManager
- Inherits:
-
Object
- Object
- Protobuf::Nats::SuperSubscriptionManager
- Defined in:
- lib/protobuf/nats/super_subscription_manager.rb
Instance Method Summary collapse
-
#initialize(nats, &cb) ⇒ SuperSubscriptionManager
constructor
A new instance of SuperSubscriptionManager.
- #logger ⇒ Object
- #queue_subscribe(name) ⇒ Object
- #shutdown(timeout = 5) ⇒ Object
- #unsubscribe_all ⇒ Object
Constructor Details
#initialize(nats, &cb) ⇒ SuperSubscriptionManager
Returns a new instance of SuperSubscriptionManager.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 10 def initialize(nats, &cb) # Central queue used by all subscriptions @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT) @subscriptions = [] @nats = nats @callback = cb @crash_count = 0 @pending_queue_handler = Thread.new do Thread.current.name = "subscription-manager-#{object_id}" begin @crash_count = 0 # Reset on successful start loop do msg = nil begin # --- Per-message processing --- msg = @pending_queue.pop # Check for shutdown poison pill break if msg == :shutdown @callback.call(msg.data, msg.reply, msg.subject) # --- End per-message processing --- rescue => # Log the error for the specific message, but DON'T kill the thread. logger.error("SubscriptionManager failed to process message: #{msg.inspect rescue 'unknown'}. Error: #{.}") ::Protobuf::Nats.notify_error_callbacks() rescue nil end end rescue => fatal_error raise if fatal_error.is_a?(SystemExit) || fatal_error.is_a?(Interrupt) || fatal_error.is_a?(SignalException) # This block is for fatal errors that crash the thread itself. logger.error("SubscriptionManager handler crashed fatally! Error: #{fatal_error.}") ::Protobuf::Nats.notify_error_callbacks(fatal_error) rescue nil # Self-healing with exponential backoff @crash_count += 1 sleep_duration = [(@crash_count**2), 60].min logger.warn("Waiting #{sleep_duration}s before restarting SubscriptionManager handler...") sleep sleep_duration retry # Restart the loop end end end |
Instance Method Details
#logger ⇒ Object
57 58 59 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 57 def logger ::Protobuf::Logging.logger end |
#queue_subscribe(name) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 61 def queue_subscribe(name) logger.debug { "queue_subscribe(#{name})" } sub = @nats.subscribe(name, :queue => name) # Create a subscription but reset the pending queue to use a central pending queue. existing_pending_queue = sub.pending_queue sub.pending_queue = @pending_queue # Push all race-conditioned messages onto the pending queue. # Should address a potential race condition. Chances of the round-trip message to an # existing queue before this queue swap happens seems extremely low, but possible. migrated_count = 0 max_migrations = 10000 # Safety limit while !existing_pending_queue.empty? && migrated_count < max_migrations msg = existing_pending_queue.pop # Non-blocking push with timeout begin Timeout.timeout(1) do @pending_queue << msg end migrated_count += 1 logger.warn "Migrated message #{migrated_count} from old queue to central queue" rescue Timeout::Error logger.error "Failed to migrate message to central queue (queue full), dropping message" break end end if migrated_count >= max_migrations logger.error "Hit migration limit! Old queue still has #{existing_pending_queue.size} messages" end @subscriptions << sub sub end |
#shutdown(timeout = 5) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 100 def shutdown(timeout = 5) # Check if thread is alive first return unless @pending_queue_handler&.alive? # Non-blocking push of shutdown signal begin # Clear some space if queue is full if @pending_queue.num_waiting == 0 && @pending_queue.size >= @pending_queue.max logger.warn "Queue full during shutdown, clearing to make room for shutdown signal" @pending_queue.clear rescue nil end Timeout.timeout(1) do @pending_queue << :shutdown end rescue Timeout::Error logger.error "Failed to send shutdown signal (queue blocked), force killing thread" @pending_queue_handler.kill if @pending_queue_handler&.alive? return end # Handle timeout and force kill if needed unless @pending_queue_handler.join(timeout) logger.warn "Handler thread did not shutdown within #{timeout}s, forcefully killing..." @pending_queue_handler.kill @pending_queue_handler.join(1) rescue nil end # Clean up queue @pending_queue.clear rescue nil end |
#unsubscribe_all ⇒ Object
132 133 134 135 136 137 138 139 140 |
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 132 def unsubscribe_all @subscriptions.each do |sub| begin sub.unsubscribe rescue => e logger.warn "Failed to unsubscribe #{sub.subject rescue 'unknown'}: #{e.}" end end end |