Class: Async::Cable::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/async/cable/socket.rb

Overview

Wraps a WebSocket connection to provide the interface expected by ActionCable connections. Buffers outbound messages in a queue and drains them asynchronously so that transmission never blocks the event loop.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(env, websocket, server, coder: ActiveSupport::JSON) ⇒ Socket

Create a new socket wrapper.



15
16
17
18
19
20
21
22
# File 'lib/async/cable/socket.rb', line 15

def initialize(env, websocket, server, coder: ActiveSupport::JSON)
	@env = env
	@websocket = websocket
	@server = server
	@coder = coder
	
	@output = ::Thread::Queue.new
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env.



24
25
26
# File 'lib/async/cable/socket.rb', line 24

def env
  @env
end

Instance Method Details

#closeObject

Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.



72
73
74
75
# File 'lib/async/cable/socket.rb', line 72

def close
	# Console.info(self, "Closing socket.", task: Async::Task.current?)
	@output.close
end

#loggerObject

The ActionCable server logger.



28
29
30
# File 'lib/async/cable/socket.rb', line 28

def logger
	@server.logger
end

#perform_work(receiver) ⇒ Object

This can be called from the work pool, off the event loop.



78
79
80
81
# File 'lib/async/cable/socket.rb', line 78

def perform_work(receiver, ...)
	# Console.info(self, "Performing work:", receiver)
	receiver.send(...)
end

#requestObject

Build an ‘ActionDispatch::Request` from the Rack environment, merging Rails application config when available.



34
35
36
37
38
39
40
41
42
43
# File 'lib/async/cable/socket.rb', line 34

def request
	# Copied from `ActionCable::Server::Socket#request`:
	@request ||= begin
		if defined?(Rails.application) && Rails.application
			environment = Rails.application.env_config.merge(@env)
		end
		
		ActionDispatch::Request.new(environment || @env)
	end
end

#run(parent: Async::Task.current) ⇒ Object

Start an async task that drains the outbound message queue and writes each message to the WebSocket. The task stops when the queue is closed.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/async/cable/socket.rb', line 48

def run(parent: Async::Task.current)
	parent.async do
		while buffer = @output.pop
			# Console.debug(self, "Sending cable data:", buffer, flush: @output.empty?)
			@websocket.send_text(buffer)
			@websocket.flush if @output.empty?
		end
	rescue => error
		Console.error(self, "Error while sending cable data:", error)
	ensure
		unless @websocket.closed?
			@websocket.close_write(error)
		end
	end
end

#transmit(data) ⇒ Object

Encode and enqueue a message for asynchronous delivery to the client.



66
67
68
69
# File 'lib/async/cable/socket.rb', line 66

def transmit(data)
	# Console.info(self, "Transmitting data:", data, task: Async::Task.current?)
	@output.push(@coder.encode(data))
end