Class: Protobuf::Nats::SuperSubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/super_subscription_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(nats, &cb) ⇒ SuperSubscriptionManager

Returns a new instance of SuperSubscriptionManager.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 10

def initialize(nats, &cb)
  # Central queue used by all subscriptions
  @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT)
  @subscriptions = []
  @nats = nats
  @callback = cb
  @crash_count = 0

  @pending_queue_handler = Thread.new do
    Thread.current.name = "subscription-manager-#{object_id}"
    begin
      @crash_count = 0  # Reset on successful start

      loop do
        msg = nil
        begin
          # --- Per-message processing ---
          msg = @pending_queue.pop
          # Check for shutdown poison pill
          break if msg == :shutdown

          @callback.call(msg.data, msg.reply, msg.subject)
          # --- End per-message processing ---
        rescue => per_message_error
          # Log the error for the specific message, but DON'T kill the thread.
          logger.error("SubscriptionManager failed to process message: #{msg.inspect rescue 'unknown'}. Error: #{per_message_error.message}")
          ::Protobuf::Nats.notify_error_callbacks(per_message_error) rescue nil
        end
      end
    rescue => fatal_error
      raise if fatal_error.is_a?(SystemExit) || fatal_error.is_a?(Interrupt) || fatal_error.is_a?(SignalException)

      # This block is for fatal errors that crash the thread itself.
      logger.error("SubscriptionManager handler crashed fatally! Error: #{fatal_error.message}")
      ::Protobuf::Nats.notify_error_callbacks(fatal_error) rescue nil

      # Self-healing with exponential backoff
      @crash_count += 1
      sleep_duration = [(@crash_count**2), 60].min
      logger.warn("Waiting #{sleep_duration}s before restarting SubscriptionManager handler...")
      sleep sleep_duration

      retry  # Restart the loop
    end
  end
end

Instance Method Details

#loggerObject



57
58
59
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 57

def logger
  ::Protobuf::Logging.logger
end

#queue_subscribe(name) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 61

def queue_subscribe(name)
  logger.debug { "queue_subscribe(#{name})" }
  sub = @nats.subscribe(name, :queue => name)

  # Create a subscription but reset the pending queue to use a central pending queue.
  existing_pending_queue = sub.pending_queue
  sub.pending_queue = @pending_queue

  # Push all race-conditioned messages onto the pending queue.
  # Should address a potential race condition. Chances of the round-trip message to an
  # existing queue before this queue swap happens seems extremely low, but possible.
  migrated_count = 0
  max_migrations = 10000  # Safety limit

  while !existing_pending_queue.empty? && migrated_count < max_migrations
    msg = existing_pending_queue.pop

    # Non-blocking push with timeout
    begin
      Timeout.timeout(1) do
        @pending_queue << msg
      end
      migrated_count += 1
      logger.warn "Migrated message #{migrated_count} from old queue to central queue"
    rescue Timeout::Error
      logger.error "Failed to migrate message to central queue (queue full), dropping message"
      break
    end
  end

  if migrated_count >= max_migrations
    logger.error "Hit migration limit! Old queue still has #{existing_pending_queue.size} messages"
  end

  @subscriptions << sub

  sub
end

#shutdown(timeout = 5) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 100

def shutdown(timeout = 5)
  # Check if thread is alive first
  return unless @pending_queue_handler&.alive?

  # Non-blocking push of shutdown signal
  begin
    # Clear some space if queue is full
    if @pending_queue.num_waiting == 0 && @pending_queue.size >= @pending_queue.max
      logger.warn "Queue full during shutdown, clearing to make room for shutdown signal"
      @pending_queue.clear rescue nil
    end

    Timeout.timeout(1) do
      @pending_queue << :shutdown
    end
  rescue Timeout::Error
    logger.error "Failed to send shutdown signal (queue blocked), force killing thread"
    @pending_queue_handler.kill if @pending_queue_handler&.alive?
    return
  end

  # Handle timeout and force kill if needed
  unless @pending_queue_handler.join(timeout)
    logger.warn "Handler thread did not shutdown within #{timeout}s, forcefully killing..."
    @pending_queue_handler.kill
    @pending_queue_handler.join(1) rescue nil
  end

  # Clean up queue
  @pending_queue.clear rescue nil
end

#unsubscribe_allObject



132
133
134
135
136
137
138
139
140
# File 'lib/protobuf/nats/super_subscription_manager.rb', line 132

def unsubscribe_all
  @subscriptions.each do |sub|
    begin
      sub.unsubscribe
    rescue => e
      logger.warn "Failed to unsubscribe #{sub.subject rescue 'unknown'}: #{e.message}"
    end
  end
end