Class: Tina4::NATSBackplane
- Inherits:
-
WebSocketBackplane
- Object
- WebSocketBackplane
- Tina4::NATSBackplane
- Defined in:
- lib/tina4/websocket_backplane.rb
Overview
NATS pub/sub backplane.
Requires the nats-pure gem (+gem install nats-pure+). The require is deferred so the rest of Tina4 works fine without it installed — an error is raised only when this class is actually instantiated.
NATS is async-native, so we run a background thread with an event machine for the subscription listener.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(url: nil) ⇒ NATSBackplane
constructor
A new instance of NATSBackplane.
- #publish(channel, message) ⇒ Object
- #subscribe(channel, &block) ⇒ Object
- #unsubscribe(channel) ⇒ Object
Methods inherited from WebSocketBackplane
Constructor Details
#initialize(url: nil) ⇒ NATSBackplane
Returns a new instance of NATSBackplane.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/tina4/websocket_backplane.rb', line 128 def initialize(url: nil) begin require "nats/client" rescue LoadError raise LoadError, "The 'nats-pure' gem is required for NATSBackplane. " \ "Install it with: gem install nats-pure" end @url = url || ENV.fetch("TINA4_WS_BACKPLANE_URL", "nats://localhost:4222") @subs = {} @threads = {} @running = true @mutex = Mutex.new # Connect to NATS in a background thread with its own event loop @nats = NATS::IO::Client.new @nats.connect(@url) end |
Instance Method Details
#close ⇒ Object
179 180 181 182 183 184 185 186 187 188 |
# File 'lib/tina4/websocket_backplane.rb', line 179 def close @running = false @mutex.synchronize do @subs.each_value { |sid| @nats.unsubscribe(sid) rescue nil } @subs.clear @threads.each_value { |t| t.kill } @threads.clear end @nats.close end |
#publish(channel, message) ⇒ Object
148 149 150 151 |
# File 'lib/tina4/websocket_backplane.rb', line 148 def publish(channel, ) @nats.publish(channel, ) @nats.flush end |
#subscribe(channel, &block) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/tina4/websocket_backplane.rb', line 153 def subscribe(channel, &block) @mutex.synchronize do sid = @nats.subscribe(channel) do |msg| block.call(msg.data) if @running end @subs[channel] = sid # Run NATS event processing in a background thread @threads[channel] ||= Thread.new do loop do break unless @running sleep 0.01 end end end end |
#unsubscribe(channel) ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/tina4/websocket_backplane.rb', line 170 def unsubscribe(channel) @mutex.synchronize do sid = @subs.delete(channel) @nats.unsubscribe(sid) if sid thread = @threads.delete(channel) thread&.kill end end |