Class: ZZQ::WebSocket::Stream
- Inherits:
-
IO::Stream::Generic
- Object
- IO::Stream::Generic
- ZZQ::WebSocket::Stream
- Defined in:
- lib/zzq/websocket/stream.rb
Overview
Byte-stream adapter over Protocol::WebSocket::Connection (or the ClientCloseDecorator returned by Async::WebSocket::Client.connect).
Per the MQTT-over-WebSocket OASIS spec, a single WS binary frame may contain multiple MQTT Control Packets and a single MQTT packet may span multiple WS frames. IO::Stream::Generic‘s read buffer already re-assembles byte streams across #sysread boundaries, so we just feed it whole WS messages and let Protocol::MQTT::Connection read framed packets out the far side as if it were talking to a TCP socket.
#wait_for_close + Async::Notification let the transport block the WS adapter fiber until either side tears down the connection.
Class Method Summary collapse
Instance Method Summary collapse
- #closed? ⇒ Boolean
-
#initialize(ws) ⇒ Stream
constructor
A new instance of Stream.
-
#wait_for_close ⇒ Object
Block the caller until #close runs (either side).
Constructor Details
#initialize(ws) ⇒ Stream
Returns a new instance of Stream.
27 28 29 30 31 32 |
# File 'lib/zzq/websocket/stream.rb', line 27 def initialize(ws) super() @ws = ws @closed = false @closed_notification = Async::Notification.new end |
Class Method Details
.wrap(ws) ⇒ Object
22 23 24 |
# File 'lib/zzq/websocket/stream.rb', line 22 def self.wrap(ws) new(ws) end |
Instance Method Details
#closed? ⇒ Boolean
35 36 37 |
# File 'lib/zzq/websocket/stream.rb', line 35 def closed? @closed end |
#wait_for_close ⇒ Object
Block the caller until #close runs (either side).
41 42 43 |
# File 'lib/zzq/websocket/stream.rb', line 41 def wait_for_close @closed_notification.wait unless @closed end |