Class: SignalWire::Relay::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/signalwire/relay/client.rb

Overview

RelayClient – WebSocket + JSON-RPC 2.0 protocol + event dispatch.

One instance = one persistent WebSocket connection to SignalWire RELAY.

Implements the 4 correlation mechanisms:

  1. JSON-RPC id -> pending hash with ConditionVariable

  2. call_id -> Call routing

  3. control_id -> Action tracking per Call

  4. tag -> dial correlation

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project: nil, token: nil, jwt_token: nil, host: nil, contexts: ['default'], max_active_calls: nil, space: nil) ⇒ Client

Python parity: “RelayClient(project=None, token=None, jwt_token=None, host=None, contexts=None, max_active_calls=None)“. Ruby v1 accepted “space:“ for the same purpose; both keyword names are honoured for backwards compat. “host“ is the canonical Python name and now drives the WebSocket endpoint.

Parameters:

  • project (String, nil) (defaults to: nil)

    project ID (env: SIGNALWIRE_PROJECT_ID)

  • token (String, nil) (defaults to: nil)

    API token (env: SIGNALWIRE_API_TOKEN)

  • jwt_token (String, nil) (defaults to: nil)

    JWT token alternative

  • host (String, nil) (defaults to: nil)

    RELAY host (env: SIGNALWIRE_SPACE). Either a bare space subdomain (“myspace“) or full hostname (“myspace.signalwire.com“).

  • contexts (Array<String>) (defaults to: ['default'])

    context names to subscribe to

  • max_active_calls (Integer, nil) (defaults to: nil)

    cap on simultaneous active inbound calls. “nil“ means unlimited (Python parity: matches “RELAY_MAX_ACTIVE_CALLS“ env override).

  • space (String, nil) (defaults to: nil)

    backwards-compat alias for “host“.

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/signalwire/relay/client.rb', line 56

def initialize(project: nil, token: nil, jwt_token: nil, host: nil,
               contexts: ['default'], max_active_calls: nil,
               space: nil)
  @project_id = project || ENV['SIGNALWIRE_PROJECT_ID'] || ''
  @token      = token || ENV['SIGNALWIRE_API_TOKEN'] || ''
  @jwt_token  = jwt_token
  # Accept either `host:` (Python parity) or legacy `space:`.
  host_arg    = host || space
  @space      = host_arg || ENV['SIGNALWIRE_SPACE'] || ''
  @contexts   = contexts

  # Python parity: max_active_calls override + RELAY_MAX_ACTIVE_CALLS env.
  if max_active_calls.nil?
    env_val = ENV['RELAY_MAX_ACTIVE_CALLS']
    @max_active_calls = env_val && !env_val.empty? ? Integer(env_val) : nil
  else
    @max_active_calls = [1, Integer(max_active_calls)].max
  end

  raise ArgumentError, 'project is required' if @project_id.empty?
  if @token.empty? && @jwt_token.nil?
    raise ArgumentError, 'token or jwt_token is required'
  end
  raise ArgumentError, 'host is required' if @space.empty?

  @host = @space.include?('.') ? @space : "#{@space}.signalwire.com"

  # Correlation mechanisms
  @pending       = {} # id -> { mutex:, cv:, result:, error: }
  @pending_mutex = Mutex.new
  @calls         = {} # call_id -> Call
  @calls_mutex   = Mutex.new
  @pending_dials = {} # tag -> { mutex:, cv:, call:, error: }
  @dials_mutex   = Mutex.new
  @messages      = {} # message_id -> Message
  @messages_mutex = Mutex.new

  # Session state
  @protocol            = nil
  @authorization_state = nil
  @ws                  = nil
  @running             = false
  @connected           = false
  @ws_mutex            = Mutex.new

  # Handlers
  @on_call_handler    = nil
  @on_message_handler = nil
  @on_event_handler   = nil

  # Reconnection
  @reconnect_delay = RECONNECT_MIN_DELAY
  @should_restart  = false
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



36
37
38
# File 'lib/signalwire/relay/client.rb', line 36

def host
  @host
end

#max_active_callsObject (readonly)

Returns the value of attribute max_active_calls.



36
37
38
# File 'lib/signalwire/relay/client.rb', line 36

def max_active_calls
  @max_active_calls
end

#project_idObject (readonly)

Returns the value of attribute project_id.



36
37
38
# File 'lib/signalwire/relay/client.rb', line 36

def project_id
  @project_id
end

#protocolObject (readonly)

Returns the value of attribute protocol.



36
37
38
# File 'lib/signalwire/relay/client.rb', line 36

def protocol
  @protocol
end

Instance Method Details

#_authorization_stateObject

Return the SDK’s tracked authorization-state blob (Python parity: RelayClient._authorization_state). Captured from signalwire.authorization.state events for use on reconnect.



155
156
157
# File 'lib/signalwire/relay/client.rb', line 155

def _authorization_state
  @authorization_state
end

#_calls_snapshotObject

Return the current call_id -> Call registry (a snapshot copy). Test/audit-only surface for asserting on internal routing state; the Python reference exposes the same via RelayClient._calls.



140
141
142
# File 'lib/signalwire/relay/client.rb', line 140

def _calls_snapshot
  @calls_mutex.synchronize { @calls.dup }
end

#_connected?Boolean

True when the client believes the WebSocket is open. Exposed for tests that need to assert the recv loop is still alive after an injected error / handler exception.

Returns:

  • (Boolean)


162
163
164
# File 'lib/signalwire/relay/client.rb', line 162

def _connected?
  @ws_mutex.synchronize { @connected }
end

#_set_protocol(value) ⇒ Object

Test/reconnect surface: stamp a previously issued protocol string before calling run so the next signalwire.connect frame carries it (the production server replies with session_restored: true). Mirrors Python’s RelayClient._relay_protocol = ….



148
149
150
# File 'lib/signalwire/relay/client.rb', line 148

def _set_protocol(value)
  @protocol = value
end

#dial(devices, timeout: 120, tag: nil, **kwargs) ⇒ Object

Dial outbound call(s). Returns a Call object.

Raises:



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/signalwire/relay/client.rb', line 211

def dial(devices, timeout: 120, tag: nil, **kwargs)
  dial_tag = tag || SecureRandom.uuid

  # Register pending dial BEFORE sending RPC
  entry = { mutex: Mutex.new, cv: ConditionVariable.new, call: nil, error: nil }
  @dials_mutex.synchronize { @pending_dials[dial_tag] = entry }

  begin
    params = { 'tag' => dial_tag, 'devices' => devices }
    kwargs.each { |k, v| params[k.to_s] = v }
    execute('calling.dial', params)
  rescue => e
    @dials_mutex.synchronize { @pending_dials.delete(dial_tag) }
    raise
  end

  # Wait for calling.call.dial event
  entry[:mutex].synchronize do
    deadline = Time.now + timeout
    while entry[:call].nil? && entry[:error].nil?
      remaining = deadline - Time.now
      if remaining <= 0
        @dials_mutex.synchronize { @pending_dials.delete(dial_tag) }
        raise ActionTimeoutError, "Dial timed out after #{timeout}s"
      end
      entry[:cv].wait(entry[:mutex], remaining)
    end
  end

  @dials_mutex.synchronize { @pending_dials.delete(dial_tag) }
  raise RelayError.new(-1, entry[:error]) if entry[:error]
  entry[:call]
end

#execute(method, params = {}) ⇒ Object

Send a JSON-RPC request and wait for the response. Returns the result hash. Raises RelayError on error.



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/signalwire/relay/client.rb', line 307

def execute(method, params = {})
  id = SecureRandom.uuid

  # Add protocol to params if we have one (except for signalwire.connect)
  if @protocol && method != METHOD_SIGNALWIRE_CONNECT
    params = params.dup
    params['protocol'] = @protocol
  end

  msg = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'method'  => method,
    'params'  => params
  }

  entry = { mutex: Mutex.new, cv: ConditionVariable.new, result: nil, error: nil }
  @pending_mutex.synchronize { @pending[id] = entry }

  _send_json(msg)

  # Wait for response (10s timeout to detect half-open connections)
  entry[:mutex].synchronize do
    deadline = Time.now + 10
    while entry[:result].nil? && entry[:error].nil?
      remaining = deadline - Time.now
      if remaining <= 0
        @pending_mutex.synchronize { @pending.delete(id) }
        raise RelayError.new(-1, "Request #{method} timed out")
      end
      entry[:cv].wait(entry[:mutex], remaining)
    end
  end

  @pending_mutex.synchronize { @pending.delete(id) }
  raise entry[:error] if entry[:error]

  result = entry[:result]

  # Check result code for non-connect methods
  if method != METHOD_SIGNALWIRE_CONNECT
    code = result['code']
    if code && !code.to_s.match?(/\A2\d\d\z/)
      raise RelayError.new(code, result['message'] || 'Unknown error')
    end
  end

  result
end

#on_call(&block) ⇒ Object

Register inbound call handler.



112
113
114
# File 'lib/signalwire/relay/client.rb', line 112

def on_call(&block)
  @on_call_handler = block
end

#on_event(&block) ⇒ Object

Register a generic inbound-event handler. Called for every signalwire.event frame BEFORE the type-specific handlers (call/message/dial) run. Used by integration probes (e.g. the audit harness) that need to react to raw events.



125
126
127
# File 'lib/signalwire/relay/client.rb', line 125

def on_event(&block)
  @on_event_handler = block
end

#on_message(&block) ⇒ Object

Register inbound message handler.



117
118
119
# File 'lib/signalwire/relay/client.rb', line 117

def on_message(&block)
  @on_message_handler = block
end

#receive(contexts) ⇒ Object


Dynamic context subscription




293
294
295
# File 'lib/signalwire/relay/client.rb', line 293

def receive(contexts)
  execute('signalwire.receive', { 'contexts' => contexts })
end

#runObject

Connect, authenticate, subscribe, and enter the read loop. Blocks until stop is called.



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/signalwire/relay/client.rb', line 168

def run
  @running = true
  while @running
    begin
      _connect_and_run
    rescue => e
      $stderr.puts "[RELAY] Connection error: #{e.message}"
    end
    break unless @running

    # Reject all pending requests
    _reject_all_pending('Disconnected')

    # Exponential backoff reconnect
    $stderr.puts "[RELAY] Reconnecting in #{@reconnect_delay}s..."
    sleep(@reconnect_delay)
    @reconnect_delay = [
      @reconnect_delay * RECONNECT_BACKOFF_FACTOR,
      RECONNECT_MAX_DELAY
    ].min
  end
end

#send_json(msg) ⇒ Object

Send an arbitrary JSON-RPC frame to the server. Public surface for tests, the audit harness, and one-off RELAY methods that don’t have a high-level wrapper. Returns nothing; outbound failures are silently ignored (matching _send_json semantics).



133
134
135
# File 'lib/signalwire/relay/client.rb', line 133

def send_json(msg)
  _send_json(msg)
end

#send_message(to_number:, from_number:, context: nil, body: nil, media: nil, tags: nil, region: nil, on_completed: nil) ⇒ Object

Send an SMS/MMS message. Returns a Message object.

Mirrors Python’s RelayClient.send_message keyword-only signature exactly. At least one of body: or media: is required.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/signalwire/relay/client.rb', line 253

def send_message(to_number:, from_number:, context: nil, body: nil,
                 media: nil, tags: nil, region: nil, on_completed: nil)
  if (body.nil? || body.empty?) && (media.nil? || media.empty?)
    raise ArgumentError, 'body or media is required'
  end

  msg_context = context || @contexts.first || 'default'
  params = {
    'context'     => msg_context,
    'to_number'   => to_number,
    'from_number' => from_number
  }
  params['body']   = body   if body
  params['media']  = media  if media
  params['tags']   = tags   if tags
  params['region'] = region if region

  result = execute('messaging.send', params)
  message_id = result['message_id'] || ''

  msg = Message.new(
    message_id:  message_id,
    context:     msg_context,
    direction:   'outbound',
    from_number: from_number,
    to_number:   to_number,
    body:        body || '',
    media:       media || [],
    state:       'queued',
    tags:        tags || []
  )
  msg._set_on_completed(on_completed) if on_completed
  @messages_mutex.synchronize { @messages[message_id] = msg } unless message_id.empty?
  msg
end

#stopObject

Graceful shutdown.



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/signalwire/relay/client.rb', line 192

def stop
  @running = false
  # Snapshot under the mutex, close outside it. The websocket-client
  # gem fires the `:close` callback synchronously inside `close`,
  # which re-enters _on_ws_close → tries to take @ws_mutex and
  # deadlocks if we're still holding it.
  ws_to_close = nil
  @ws_mutex.synchronize do
    ws_to_close = @ws if @connected
  end
  ws_to_close&.close
  _reject_all_pending('Client stopped')
end

#unreceive(contexts) ⇒ Object



297
298
299
# File 'lib/signalwire/relay/client.rb', line 297

def unreceive(contexts)
  execute('signalwire.unreceive', { 'contexts' => contexts })
end