Class: Async::Cable::Socket
- Inherits:
-
Object
- Object
- Async::Cable::Socket
- Defined in:
- lib/async/cable/socket.rb
Overview
Wraps a WebSocket connection to provide the interface expected by ActionCable connections. Buffers outbound messages in a queue and drains them asynchronously so that transmission never blocks the event loop.
Instance Attribute Summary collapse
-
#env ⇒ Object
readonly
Returns the value of attribute env.
Instance Method Summary collapse
-
#close ⇒ Object
Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.
-
#initialize(env, websocket, server, coder: ActiveSupport::JSON) ⇒ Socket
constructor
Create a new socket wrapper.
-
#logger ⇒ Object
The ActionCable server logger.
-
#perform_work(receiver) ⇒ Object
This can be called from the work pool, off the event loop.
-
#request ⇒ Object
Build an ‘ActionDispatch::Request` from the Rack environment, merging Rails application config when available.
-
#run(parent: Async::Task.current) ⇒ Object
Start an async task that drains the outbound message queue and writes each message to the WebSocket.
-
#transmit(data) ⇒ Object
Encode and enqueue a message for asynchronous delivery to the client.
Constructor Details
#initialize(env, websocket, server, coder: ActiveSupport::JSON) ⇒ Socket
Create a new socket wrapper.
15 16 17 18 19 20 21 22 |
# File 'lib/async/cable/socket.rb', line 15 def initialize(env, websocket, server, coder: ActiveSupport::JSON) @env = env @websocket = websocket @server = server @coder = coder @output = ::Thread::Queue.new end |
Instance Attribute Details
#env ⇒ Object (readonly)
Returns the value of attribute env.
24 25 26 |
# File 'lib/async/cable/socket.rb', line 24 def env @env end |
Instance Method Details
#close ⇒ Object
Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.
72 73 74 75 |
# File 'lib/async/cable/socket.rb', line 72 def close # Console.info(self, "Closing socket.", task: Async::Task.current?) @output.close end |
#logger ⇒ Object
The ActionCable server logger.
28 29 30 |
# File 'lib/async/cable/socket.rb', line 28 def logger @server.logger end |
#perform_work(receiver) ⇒ Object
This can be called from the work pool, off the event loop.
78 79 80 81 |
# File 'lib/async/cable/socket.rb', line 78 def perform_work(receiver, ...) # Console.info(self, "Performing work:", receiver) receiver.send(...) end |
#request ⇒ Object
Build an ‘ActionDispatch::Request` from the Rack environment, merging Rails application config when available.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/async/cable/socket.rb', line 34 def request # Copied from `ActionCable::Server::Socket#request`: @request ||= begin if defined?(Rails.application) && Rails.application environment = Rails.application.env_config.merge(@env) end ActionDispatch::Request.new(environment || @env) end end |
#run(parent: Async::Task.current) ⇒ Object
Start an async task that drains the outbound message queue and writes each message to the WebSocket. The task stops when the queue is closed.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/async/cable/socket.rb', line 48 def run(parent: Async::Task.current) parent.async do while buffer = @output.pop # Console.debug(self, "Sending cable data:", buffer, flush: @output.empty?) @websocket.send_text(buffer) @websocket.flush if @output.empty? end rescue => error Console.error(self, "Error while sending cable data:", error) ensure unless @websocket.closed? @websocket.close_write(error) end end end |
#transmit(data) ⇒ Object
Encode and enqueue a message for asynchronous delivery to the client.
66 67 68 69 |
# File 'lib/async/cable/socket.rb', line 66 def transmit(data) # Console.info(self, "Transmitting data:", data, task: Async::Task.current?) @output.push(@coder.encode(data)) end |