Class: Turbocable::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/turbocable/client.rb

Overview

Entry point for publishing messages to the TurboCable fan-out pipeline.

Prefer the top-level convenience method:

Turbocable.broadcast("chat_room_42", text: "hello")

Use the client directly when you need per-call codec overrides or access to the returned JetStream ack:

client = Turbocable::Client.new(Turbocable.config)
ack = client.broadcast("chat_room_42", {text: "hi"}, codec: :json)
puts ack.stream   # => "TURBOCABLE"
puts ack.seq      # => 1

Data flow

  1. Validate the stream name against the server’s subject-token charset.

  2. Look up the codec (default from config.default_codec, or per-call).

  3. Serialize the payload to bytes.

  4. Enforce config.max_payload_bytes — fail fast before touching NATS.

  5. Publish to NATS JetStream via NatsConnection#publish.

  6. On transient failures (Timeout, JetStream::Error), retry with exponential backoff up to config.max_retries times.

  7. Return the JetStream ack on success or raise PublishError on final failure.

On delivery guarantees

A successful ack means NATS JetStream has persisted the message. If the server operator has set TURBOCABLE_STREAM_RATE_LIMIT_RPS, messages that exceed the stream’s rate limit may be dropped by turbocable-server after a successful NATS ack. A green broadcast is therefore not an end-to-end delivery guarantee — it is a persistence guarantee.

Constant Summary collapse

STREAM_NAME_PATTERN =

Conservative charset matching what the server’s NATS glob authorizer accepts as a subject token. Characters that break NATS subject parsing (., *, >, whitespace) are excluded.

/\A[A-Za-z0-9_:-]+\z/
BASE_DELAY =

Exponential backoff parameters.

0.05
BACKOFF_FACTOR =

50 ms

2
JITTER_FACTOR =

±20%

0.20

Instance Method Summary collapse

Constructor Details

#initialize(config, connection: nil, clock: nil) ⇒ Client

Returns a new instance of Client.

Parameters:



52
53
54
55
56
# File 'lib/turbocable/client.rb', line 52

def initialize(config, connection: nil, clock: nil)
  @config = config
  @connection = connection
  @clock = clock
end

Instance Method Details

#broadcast(stream_name, payload, codec: nil) ⇒ NATS::JetStream::PubAck

Publishes payload to the stream_name subject.

Parameters:

  • stream_name (String)

    logical stream name (e.g. “chat_room_42”). Must match +/A\z/.

  • payload (Object)

    value serializable by the codec (typically a Hash).

  • codec (Symbol, nil) (defaults to: nil)

    codec override; falls back to config.default_codec when nil.

Returns:

  • (NATS::JetStream::PubAck)

Raises:



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/turbocable/client.rb', line 93

def broadcast(stream_name, payload, codec: nil)
  validate_stream_name!(stream_name)

  codec_module = Codecs.fetch(codec || @config.default_codec)
  bytes = codec_module.encode(payload)

  enforce_payload_size!(bytes)

  subject = "#{@config.subject_prefix}.#{stream_name}"
  publish_with_retries(subject, bytes)
end

#healthy?Boolean

Returns true if the underlying adapter is reachable, false otherwise.

For the :nats adapter this issues a NATS flush (PING/PONG round-trip) and, if the stream name is known, fetches JetStream stream info to confirm JetStream connectivity. For the :null adapter this always returns true.

The method *never raises* on network errors — callers that need strict semantics should use Turbocable.healthcheck! instead.

Returns:

  • (Boolean)

Raises:



69
70
71
72
73
74
75
76
77
78
# File 'lib/turbocable/client.rb', line 69

def healthy?
  connection.ping(timeout: @config.publish_timeout)
rescue ConfigurationError
  raise
rescue => e
  @config.logger.warn do
    "[Turbocable] Health check failed: #{e.class}#{e.message}"
  end
  false
end