Class: Tina4::NATSBackplane

Inherits:
WebSocketBackplane show all
Defined in:
lib/tina4/websocket_backplane.rb

Overview

NATS pub/sub backplane.

Requires the nats-pure gem (+gem install nats-pure+). The require is deferred so the rest of Tina4 works fine without it installed — an error is raised only when this class is actually instantiated.

NATS is async-native, so we run a background thread with an event machine for the subscription listener.

Instance Method Summary collapse

Methods inherited from WebSocketBackplane

create_backplane

Constructor Details

#initialize(url: nil) ⇒ NATSBackplane

Returns a new instance of NATSBackplane.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/tina4/websocket_backplane.rb', line 128

def initialize(url: nil)
  begin
    require "nats/client"
  rescue LoadError
    raise LoadError,
      "The 'nats-pure' gem is required for NATSBackplane. " \
      "Install it with: gem install nats-pure"
  end

  @url = url || ENV.fetch("TINA4_WS_BACKPLANE_URL", "nats://localhost:4222")
  @subs = {}
  @threads = {}
  @running = true
  @mutex = Mutex.new

  # Connect to NATS in a background thread with its own event loop
  @nats = NATS::IO::Client.new
  @nats.connect(@url)
end

Instance Method Details

#closeObject



179
180
181
182
183
184
185
186
187
188
# File 'lib/tina4/websocket_backplane.rb', line 179

def close
  @running = false
  @mutex.synchronize do
    @subs.each_value { |sid| @nats.unsubscribe(sid) rescue nil }
    @subs.clear
    @threads.each_value { |t| t.kill }
    @threads.clear
  end
  @nats.close
end

#publish(channel, message) ⇒ Object



148
149
150
151
# File 'lib/tina4/websocket_backplane.rb', line 148

def publish(channel, message)
  @nats.publish(channel, message)
  @nats.flush
end

#subscribe(channel, &block) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/tina4/websocket_backplane.rb', line 153

def subscribe(channel, &block)
  @mutex.synchronize do
    sid = @nats.subscribe(channel) do |msg|
      block.call(msg.data) if @running
    end
    @subs[channel] = sid

    # Run NATS event processing in a background thread
    @threads[channel] ||= Thread.new do
      loop do
        break unless @running
        sleep 0.01
      end
    end
  end
end

#unsubscribe(channel) ⇒ Object



170
171
172
173
174
175
176
177
# File 'lib/tina4/websocket_backplane.rb', line 170

def unsubscribe(channel)
  @mutex.synchronize do
    sid = @subs.delete(channel)
    @nats.unsubscribe(sid) if sid
    thread = @threads.delete(channel)
    thread&.kill
  end
end