cent
Ruby client for the Centrifugo server HTTP API.
Cent::Client— call server API methods (publish, broadcast, presence, history, …).Cent::Notary— issue connection and subscription JWTs.
Works with Centrifugo v4 and newer (tested against v6.7.1). Ruby 3.0+.
Installation
gem 'cent', '~> 4.0'
$ bundle install
API client
client = Cent::Client.new(api_key: 'your-api-key')
# Or pointing at a remote Centrifugo:
client = Cent::Client.new(
api_key: 'your-api-key',
endpoint: 'https://centrifugo.example.com/api',
timeout: 5
)
Every method returns the parsed response body from Centrifugo:
- On success the body has a
"result"key:{ "result" => { ... } }. - On an API-level failure (e.g. unknown channel, namespace not found)
Cent::ResponseErroris raised with Centrifugo's numericcodeandmessage. - On a transport problem (network failure, timeout, non-2xx HTTP, malformed JSON) a
Cent::Errorsubclass is raised.
batch and broadcast are different — see their sections below.
Customizing the connection
The initializer yields the underlying Faraday::Connection so you can adjust headers, timeouts, adapter, etc.
Cent::Client.new(api_key: 'k') do |conn|
conn.headers['User-Agent'] = 'my-app/1.0'
conn..open_timeout = 3
conn..timeout = 7
conn.adapter :typhoeus
end
Publishing
client.publish(channel: 'chat', data: { text: 'hello' })
# => {"result" => {"offset" => 1, "epoch" => "xyz"}}
client.publish(
channel: 'chat',
data: { text: 'hello' },
skip_history: false,
tags: { 'author' => '42' },
idempotency_key: 'my-idempotency-key',
delta: true
)
See publish.
Broadcast
response = client.broadcast(channels: %w[chat:1 chat:2], data: { text: 'hi' })
# response => { "result" => { "responses" => [ {"result" => {...}}, {"result" => {...}} ] } }
The outer call only raises Cent::ResponseError if the whole broadcast is rejected (e.g. malformed request). Per-channel failures are delivered as individual entries in response["result"]["responses"], each of which may contain an "error" key — these are not raised. Walk the array to check them:
response['result']['responses'].each_with_index do |r, i|
warn "channel #{i} failed: #{r['error']['message']}" if r['error']
end
Subscribe / Unsubscribe
client.subscribe(user: '42', channel: 'chat')
client.unsubscribe(user: '42', channel: 'chat')
Disconnect / Refresh
client.disconnect(user: '42')
client.disconnect(user: '42', whitelist: %w[keep-this-client-id])
client.refresh(user: '42', expired: true)
Presence / Presence stats
client.presence(channel: 'chat')
client.presence_stats(channel: 'chat')
History
client.history(channel: 'chat', limit: 10)
client.history(channel: 'chat', limit: 10, reverse: true)
client.history(channel: 'chat', limit: 10, since: { 'offset' => 5, 'epoch' => 'xyz' })
client.history_remove(channel: 'chat')
Channels
client.channels
client.channels(pattern: 'chat:*')
Info
client.info
Batch
Send many commands in one HTTP request — Centrifugo processes them sequentially (or in parallel with parallel: true) and returns one reply per command in the same order.
response = client.batch(commands: [
{ 'publish' => { 'channel' => 'a', 'data' => { 'x' => 1 } } },
{ 'publish' => { 'channel' => 'b', 'data' => { 'x' => 2 } } },
{ 'presence_stats' => { 'channel' => 'a' } }
])
# => { "replies" => [ {"publish" => {...}}, {"publish" => {...}}, {"presence_stats" => {...}} ] }
Two things about batch are different from every other method:
- No
resultwrapper. The response is{ "replies" => [...] }at the top level. This matches Centrifugo's wire format. - Per-command errors are not raised. Each entry in
repliesmay instead be{ "error" => { "code" => ..., "message" => ... } }. Raising on the first would make partial-success responses impossible to inspect — so the caller is expected to walk the array:
response['replies'].each_with_index do |reply, i|
if reply['error']
warn "command #{i} failed: #{reply['error']['code']} #{reply['error']['message']}"
end
end
Cent::ResponseError is still raised if Centrifugo rejects the batch request as a whole (e.g. malformed top-level body).
Error handling
begin
response = client.publish(channel: 'chat', data: 'hi')
rescue Cent::ResponseError => e
# Centrifugo rejected the request (e.g. unknown channel, namespace not found).
puts "Centrifugo error #{e.code}: #{e.}"
rescue Cent::TimeoutError
# request timed out
rescue Cent::NetworkError
# connection refused / DNS failure / etc.
rescue Cent::UnauthorizedError => e
# HTTP 401 — API key is wrong
rescue Cent::TransportError => e
# other 4xx/5xx — e.status has the HTTP code
rescue Cent::DecodeError
# response body wasn't valid JSON
end
All of the above inherit from Cent::Error, so you can rescue that single class if you don't need to discriminate.
Token generation
notary = Cent::Notary.new(secret: 'hmac-secret') # HS256
notary = Cent::Notary.new(secret: rsa_private_key, algorithm: 'RS256') # RSA
notary = Cent::Notary.new(secret: ec_private_key, algorithm: 'ES256') # ECDSA
Connection token
Used by clients to establish a real-time connection. See authentication.
notary.issue_connection_token(sub: '42')
notary.issue_connection_token(sub: '42', exp: Time.now.to_i + 600, info: { name: 'Alex' })
# With any of the standard/Centrifugo claims:
notary.issue_connection_token(
sub: '42', exp: 1735689600, iat: 1735686000, jti: SecureRandom.uuid,
aud: 'centrifugo', iss: 'my-app',
info: { role: 'admin' }, meta: { tenant: 'acme' },
channels: %w[user:42 news],
subs: { 'room:1' => { 'data' => { 'welcome' => true } } }
)
Subscription token
Used by clients to subscribe to a channel that requires token authorization. See channel token auth.
notary.issue_channel_token(sub: '42', channel: 'private-chat', exp: 1735689600)
notary.issue_channel_token(
sub: '42', channel: 'private-chat',
info: { role: 'writer' },
override: { 'presence' => { 'value' => true } }
)
Migrating from v3
v4 changes some aspects of the library. We expect smooth migration for happy path though.
- Centrifugo v5+ is required. v3 of this gem spoke the legacy
POST /apiJSON-RPC-style protocol; v4 uses the current per-method endpoints (POST /api/publish,POST /api/broadcast, …) and sends the API key asX-API-Keyinstead ofAuthorization: apikey <key>. - Error handling is unchanged for the common case.
Cent::ResponseErrorstill exists and is still raised when Centrifugo returns a top-level API error. Existingrescue Cent::ResponseError => eblocks usinge.code/e.messagekeep working. The new additions are typed transport errors —Cent::TimeoutError,Cent::NetworkError,Cent::TransportError,Cent::UnauthorizedError,Cent::DecodeError— all subclassed underCent::Error. - Keyword arg rename:
Cent::Notary#issue_channel_token(client:)→issue_channel_token(sub:)to match Centrifugo's standardsubJWT claim. - Ruby 3.0+ is required (was 2.5+).
- New methods added for common Centrifugo operations:
subscribe,refresh,history_remove,batch. - Richer kwargs on existing methods (e.g.
publishnow acceptstags,skip_history,idempotency_key,delta,version,version_epoch,b64data;historyacceptslimit,since,reverse;channelsacceptspattern).
Development
$ bin/setup # install dependencies
$ bundle exec rspec # run unit tests
$ bundle exec rubocop # lint
Running integration tests
Integration tests under spec/integration/ exercise a real Centrifugo server. They're skipped unless CENTRIFUGO_API_URL is set.
$ docker compose up -d
$ CENTRIFUGO_API_URL=http://localhost:8000/api CENTRIFUGO_API_KEY=api_key bundle exec rspec spec/integration
Testing across Faraday / JWT versions
$ bundle exec appraisal install # generate gemfiles/*.gemfile.lock
$ bundle exec appraisal rspec # run the full matrix locally
License
MIT — see LICENSE.txt.