Class: ZZQ::WebSocket::Stream

Inherits:
IO::Stream::Generic
  • Object
show all
Defined in:
lib/zzq/websocket/stream.rb

Overview

Byte-stream adapter over Protocol::WebSocket::Connection (or the ClientCloseDecorator returned by Async::WebSocket::Client.connect).

Per the MQTT-over-WebSocket OASIS spec, a single WS binary frame may contain multiple MQTT Control Packets and a single MQTT packet may span multiple WS frames. IO::Stream::Generic‘s read buffer already re-assembles byte streams across #sysread boundaries, so we just feed it whole WS messages and let Protocol::MQTT::Connection read framed packets out the far side as if it were talking to a TCP socket.

#wait_for_close + Async::Notification let the transport block the WS adapter fiber until either side tears down the connection.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ws) ⇒ Stream

Returns a new instance of Stream.



27
28
29
30
31
32
# File 'lib/zzq/websocket/stream.rb', line 27

def initialize(ws)
  super()
  @ws                  = ws
  @closed              = false
  @closed_notification = Async::Notification.new
end

Class Method Details

.wrap(ws) ⇒ Object



22
23
24
# File 'lib/zzq/websocket/stream.rb', line 22

def self.wrap(ws)
  new(ws)
end

Instance Method Details

#closed?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/zzq/websocket/stream.rb', line 35

def closed?
  @closed
end

#wait_for_closeObject

Block the caller until #close runs (either side).



41
42
43
# File 'lib/zzq/websocket/stream.rb', line 41

def wait_for_close
  @closed_notification.wait unless @closed
end