Class: Turbocable::NatsConnection Private

Inherits:
Object
  • Object
show all
Defined in:
lib/turbocable/nats_connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Manages the process-wide NATS connection and JetStream context.

Design invariants

  • *One connection per process.* nats-pure is thread-safe; multiple Ruby threads can publish concurrently without additional serialization.

  • *Lazy open.* The connection is not established until the first #publish call. This means apps that configure Turbocable but never broadcast (e.g. consumer-only processes) don’t open a NATS socket.

  • Fork-safe. On Process.fork (Puma / Unicorn worker boot), child processes detect the PID change and reopen their own connection instead of sharing the parent’s file descriptors.

  • *Clean shutdown.* An at_exit hook flushes pending acks and closes the connection gracefully when the process exits.

  • *No stream management.* The gem never creates or alters the TURBOCABLE JetStream stream — that’s turbocable-server‘s responsibility. If the stream is absent, NATS returns a “no stream matches subject” error that is surfaced as PublishError with an actionable message.

Constant Summary collapse

KV_BUCKET_MISSING_ON_LOOKUP =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception types raised by nats-pure when a JetStream KV bucket does not exist yet (#key_value must then call create_key_value).

[
  NATS::JetStream::Error::NotFound,
  NATS::JetStream::Error::StreamNotFound,
  *(defined?(NATS::KeyValue::BucketNotFoundError) ? [NATS::KeyValue::BucketNotFoundError] : [])
].freeze

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ NatsConnection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of NatsConnection.

Parameters:



36
37
38
39
40
41
42
43
44
# File 'lib/turbocable/nats_connection.rb', line 36

def initialize(config)
  @config = config
  @mutex = Mutex.new
  @nc = nil # NATS::IO::Client
  @js = nil # JetStream context
  @pid = nil # PID at connection open time

  at_exit { close_quietly }
end

Instance Method Details

#closevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Closes the NATS connection and resets internal state. Safe to call multiple times — subsequent calls are no-ops.



99
100
101
102
103
104
105
106
# File 'lib/turbocable/nats_connection.rb', line 99

def close
  @mutex.synchronize do
    @nc&.close
    @nc = nil
    @js = nil
    @pid = nil
  end
end

#key_value(bucket, history: 1) ⇒ NATS::JetStream::KeyValue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a NATS KV store handle for bucket, creating the bucket with sensible defaults if it does not yet exist.

Used by Turbocable::Auth.publish_public_key!. The server watches the bucket but does not create it; the gem is the source of truth for the bucket’s lifecycle.

Parameters:

  • bucket (String)

    KV bucket name (e.g. “TC_PUBKEYS”)

  • history (Integer) (defaults to: 1)

    revision history depth (default: 1)

Returns:

  • (NATS::JetStream::KeyValue)

Raises:



86
87
88
89
90
91
92
93
# File 'lib/turbocable/nats_connection.rb', line 86

def key_value(bucket, history: 1)
  ensure_connected!
  begin
    @js.key_value(bucket)
  rescue *KV_BUCKET_MISSING_ON_LOOKUP
    @js.create_key_value(bucket: bucket, history: history)
  end
end

#ping(timeout: 2.0) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if NATS is reachable via a flush round-trip.

Parameters:

  • timeout (Float) (defaults to: 2.0)

Returns:

  • (Boolean)


67
68
69
70
71
72
73
# File 'lib/turbocable/nats_connection.rb', line 67

def ping(timeout: 2.0)
  ensure_connected!
  @nc.flush(timeout)
  true
rescue
  false
end

#publish(subject, bytes, timeout:) ⇒ NATS::JetStream::PubAck

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Publishes bytes to subject via JetStream and returns the ack.

Opens the connection on first call (lazy) and reopens it in a forked child process (fork-safe).

Parameters:

  • subject (String)

    full NATS subject (e.g. “TURBOCABLE.chat_room_42”)

  • bytes (String)

    encoded payload bytes

  • timeout (Float)

    per-publish ack wait in seconds

Returns:

  • (NATS::JetStream::PubAck)

    the JetStream publish acknowledgement

Raises:



56
57
58
59
60
61
# File 'lib/turbocable/nats_connection.rb', line 56

def publish(subject, bytes, timeout:)
  ensure_connected!
  @js.publish(subject, bytes, timeout: timeout)
rescue NATS::IO::NoRespondersError, NATS::JetStream::Error => e
  handle_nats_error(e, subject)
end