Class: Rockbox::WsTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/rockbox/transport.rb

Overview


WebSocket transport — speaks the ‘graphql-transport-ws` protocol.

Each call to #subscribe returns a “stop” lambda that cancels the subscription. Reconnection is intentionally simple: the transport is disposable, so callers should rebuild the client on terminal errors.


Constant Summary collapse

GRAPHQL_TRANSPORT_WS =
"graphql-transport-ws"

Instance Method Summary collapse

Constructor Details

#initialize(url) ⇒ WsTransport

Returns a new instance of WsTransport.



81
82
83
84
85
86
87
88
# File 'lib/rockbox/transport.rb', line 81

def initialize(url)
  @url = url
  @client = nil
  @lock = Mutex.new
  @sinks = {}        # subscription id => sink hash
  @ack = false
  @ack_signal = ConditionVariable.new
end

Instance Method Details

#disposeObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/rockbox/transport.rb', line 115

def dispose
  @lock.synchronize do
    @sinks.clear
    if @client
      begin
        @client.close
      rescue StandardError
        # ignored
      end
      @client = nil
      @ack = false
    end
  end
end

#subscribe(query, variables, sink) ⇒ Proc

Returns call to unsubscribe.

Parameters:

  • query (String)
  • variables (Hash, nil)
  • sink (Hash{Symbol => Proc})

    keys: :next, :error, :complete

Returns:

  • (Proc)

    call to unsubscribe



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rockbox/transport.rb', line 94

def subscribe(query, variables, sink)
  ensure_connected

  sub_id = SecureRandom.uuid
  @lock.synchronize { @sinks[sub_id] = sink }

  send_message(
    id: sub_id,
    type: "subscribe",
    payload: {
      query: query,
      variables: variables ? CaseConversion.deep_camelize(variables) : {}
    }
  )

  lambda do
    send_message(id: sub_id, type: "complete") rescue nil
    @lock.synchronize { @sinks.delete(sub_id) }
  end
end