Class: Cent::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/cent/client.rb

Overview

Cent::Client

Ruby client for Centrifugo server HTTP API (Centrifugo v4+).

Every API method returns the raw parsed response body from Centrifugo —typically ‘{ “result” => { … } }` on success. If Centrifugo rejects the request with a top-level `error`, ResponseError is raised with Centrifugo’s numeric ‘code` and `message`. Transport-level problems (network failure, timeout, non-2xx HTTP status, unparseable body) raise other Error subclasses.

#batch and #broadcast are special: their responses contain an array of independent sub-replies, each of which may carry its own ‘error` field. Those sub-reply errors are NOT raised — callers inspect them manually. See #batch for details.

Examples:

Basic usage

client = Cent::Client.new(api_key: 'secret')
client.publish(channel: 'chat', data: { text: 'hi' })
# => {"result" => {}}

Custom Faraday configuration

Cent::Client.new(api_key: 'k', endpoint: 'https://c.example.com/api') do |conn|
  conn.options.open_timeout = 3
  conn.options.timeout      = 7
  conn.adapter :typhoeus
end

Constant Summary collapse

DEFAULT_ENDPOINT =
'http://localhost:8000/api'
DEFAULT_TIMEOUT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_key:, endpoint: DEFAULT_ENDPOINT, timeout: DEFAULT_TIMEOUT) {|Faraday::Connection| ... } ⇒ Client

Returns a new instance of Client.

Parameters:

  • api_key (String)

    Centrifugo HTTP API key (sent as ‘X-API-Key`).

  • endpoint (String) (defaults to: DEFAULT_ENDPOINT)

    Centrifugo HTTP API base URL.

  • timeout (Numeric) (defaults to: DEFAULT_TIMEOUT)

    Request timeout in seconds.

Yields:

  • (Faraday::Connection)

    optional block to further configure the connection.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/cent/client.rb', line 44

def initialize(api_key:, endpoint: DEFAULT_ENDPOINT, timeout: DEFAULT_TIMEOUT, &block)
  headers = {
    'Content-Type' => 'application/json',
    'X-API-Key' => api_key
  }

  base = endpoint.end_with?('/') ? endpoint : "#{endpoint}/"

  @connection = Faraday.new(base, headers: headers) do |conn|
    conn.options.timeout      = timeout
    conn.options.open_timeout = timeout
    conn.request :json
    conn.response :json
    conn.response :raise_error
    block&.call(conn)
  end
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



38
39
40
# File 'lib/cent/client.rb', line 38

def connection
  @connection
end

Instance Method Details

#batch(commands:, parallel: nil) ⇒ Object

Send many commands in a single request.

The response is shaped ‘{ “replies” => [<reply>, …] }` — note there is no top-level `result` wrapper, unlike every other method. Each reply in the array corresponds to one command (in the order they were sent when `parallel` is not set) and has the shape `{ “<method>” => <result> }` on success or `{ “error” => { “code” => …, “message” => … } }` on a per-command failure.

These per-command errors are not raised as ResponseError —that would make partial-failure responses impossible to inspect. The caller is expected to walk ‘response` and check each entry. If Centrifugo rejects the batch request as a whole (e.g. malformed top-level body), the top-level `error` field is present and ResponseError is raised normally.

Examples:

response = client.batch(commands: [
  { 'publish' => { 'channel' => 'a', 'data' => {} } },
  { 'publish' => { 'channel' => 'unknown:b', 'data' => {} } }
])
response['replies'].each do |reply|
  if reply['error']
    warn "command failed: #{reply['error']['code']} #{reply['error']['message']}"
  end
end

Parameters:

  • commands (Array<Hash>)

    Each element is a command object of the form ‘{ “publish” => { … } }`, `{ “broadcast” => { … } }`, etc.

  • parallel (Boolean, nil) (defaults to: nil)

    When true, Centrifugo processes commands in parallel (lower latency, order not guaranteed).

See Also:



222
223
224
225
226
227
# File 'lib/cent/client.rb', line 222

def batch(commands:, parallel: nil)
  send_command('batch', {
                 'commands' => commands,
                 'parallel' => parallel
               })
end

#broadcast(channels:, data:, skip_history: nil, tags: nil, b64data: nil, idempotency_key: nil, delta: nil, version: nil, version_epoch: nil) ⇒ Object

Publish the same data into many channels.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cent/client.rb', line 81

def broadcast(channels:, data:, skip_history: nil, tags: nil, b64data: nil,
              idempotency_key: nil, delta: nil, version: nil, version_epoch: nil)
  send_command('broadcast', {
                 'channels' => channels,
                 'data' => data,
                 'skip_history' => skip_history,
                 'tags' => tags,
                 'b64data' => b64data,
                 'idempotency_key' => idempotency_key,
                 'delta' => delta,
                 'version' => version,
                 'version_epoch' => version_epoch
               })
end

#channels(pattern: nil) ⇒ Object

List active channels (channels with at least one subscriber).



180
181
182
# File 'lib/cent/client.rb', line 180

def channels(pattern: nil)
  send_command('channels', { 'pattern' => pattern })
end

#disconnect(user:, client: nil, session: nil, whitelist: nil, disconnect: nil) ⇒ Object

Disconnect a user by ID.



127
128
129
130
131
132
133
134
135
# File 'lib/cent/client.rb', line 127

def disconnect(user:, client: nil, session: nil, whitelist: nil, disconnect: nil)
  send_command('disconnect', {
                 'user' => user,
                 'client' => client,
                 'session' => session,
                 'whitelist' => whitelist,
                 'disconnect' => disconnect
               })
end

#history(channel:, limit: nil, since: nil, reverse: nil) ⇒ Object

Get channel history (recent publications).



163
164
165
166
167
168
169
170
# File 'lib/cent/client.rb', line 163

def history(channel:, limit: nil, since: nil, reverse: nil)
  send_command('history', {
                 'channel' => channel,
                 'limit' => limit,
                 'since' => since,
                 'reverse' => reverse
               })
end

#history_remove(channel:) ⇒ Object

Remove all publications from a channel’s history.



174
175
176
# File 'lib/cent/client.rb', line 174

def history_remove(channel:)
  send_command('history_remove', { 'channel' => channel })
end

#infoObject

Get information about running Centrifugo nodes.



186
187
188
# File 'lib/cent/client.rb', line 186

def info
  send_command('info', {})
end

#presence(channel:) ⇒ Object

Get channel presence (all currently subscribed clients).



151
152
153
# File 'lib/cent/client.rb', line 151

def presence(channel:)
  send_command('presence', { 'channel' => channel })
end

#presence_stats(channel:) ⇒ Object

Get short presence stats for a channel.



157
158
159
# File 'lib/cent/client.rb', line 157

def presence_stats(channel:)
  send_command('presence_stats', { 'channel' => channel })
end

#publish(channel:, data:, skip_history: nil, tags: nil, b64data: nil, idempotency_key: nil, delta: nil, version: nil, version_epoch: nil) ⇒ Object

Publish data into a channel.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/cent/client.rb', line 64

def publish(channel:, data:, skip_history: nil, tags: nil, b64data: nil,
            idempotency_key: nil, delta: nil, version: nil, version_epoch: nil)
  send_command('publish', {
                 'channel' => channel,
                 'data' => data,
                 'skip_history' => skip_history,
                 'tags' => tags,
                 'b64data' => b64data,
                 'idempotency_key' => idempotency_key,
                 'delta' => delta,
                 'version' => version,
                 'version_epoch' => version_epoch
               })
end

#refresh(user:, client: nil, session: nil, expired: nil, expire_at: nil) ⇒ Object

Refresh a user connection (for unidirectional transports).



139
140
141
142
143
144
145
146
147
# File 'lib/cent/client.rb', line 139

def refresh(user:, client: nil, session: nil, expired: nil, expire_at: nil)
  send_command('refresh', {
                 'user' => user,
                 'client' => client,
                 'session' => session,
                 'expired' => expired,
                 'expire_at' => expire_at
               })
end

#subscribe(user:, channel:, info: nil, b64info: nil, client: nil, session: nil, data: nil, b64data: nil, recover_since: nil, override: nil) ⇒ Object

Subscribe a user’s active session to a channel (server-side subscription).



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/cent/client.rb', line 98

def subscribe(user:, channel:, info: nil, b64info: nil, client: nil, session: nil,
              data: nil, b64data: nil, recover_since: nil, override: nil)
  send_command('subscribe', {
                 'user' => user,
                 'channel' => channel,
                 'info' => info,
                 'b64info' => b64info,
                 'client' => client,
                 'session' => session,
                 'data' => data,
                 'b64data' => b64data,
                 'recover_since' => recover_since,
                 'override' => override
               })
end

#unsubscribe(user:, channel:, client: nil, session: nil) ⇒ Object

Unsubscribe a user from a channel.



116
117
118
119
120
121
122
123
# File 'lib/cent/client.rb', line 116

def unsubscribe(user:, channel:, client: nil, session: nil)
  send_command('unsubscribe', {
                 'user' => user,
                 'channel' => channel,
                 'client' => client,
                 'session' => session
               })
end