Class: ActionCable::Connection::Base

Inherits:
Object
  • Object
show all
Includes:
Authorization, Callbacks, Identification, InternalChannel, ActiveSupport::Rescuable
Defined in:
lib/action_cable/connection/base.rb

Overview

Action Cable Connection Base

For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond authentication and authorization.

Here’s a basic example:

module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user

    def connect
      self.current_user = find_verified_user
      logger.add_tags current_user.name
    end

    def disconnect
      # Any cleanup work needed when the cable connection is cut.
    end

    private
      def find_verified_user
        User.find_by_identity(cookies.encrypted[:identity_id]) ||
          reject_unauthorized_connection
      end
  end
end

First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections established for that current_user (and potentially disconnect them). You can declare as many identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.

Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.

Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.

Pretty simple, eh?

Direct Known Subclasses

ApplicationCable::Connection

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Authorization

#reject_unauthorized_connection

Methods included from Identification

#connection_identifier

Constructor Details

#initialize(server, env, coder: ActiveSupport::JSON) ⇒ Base

Returns a new instance of Base.



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/action_cable/connection/base.rb', line 58

def initialize(server, env, coder: ActiveSupport::JSON)
  @server, @env, @coder = server, env, coder

  @worker_pool = server.worker_pool
  @logger = new_tagged_logger

  @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
  @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
  @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

  @_internal_subscriptions = nil
  @started_at = Time.now
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def env
  @env
end

#loggerObject (readonly)

Returns the value of attribute logger.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def logger
  @logger
end

#protocolObject (readonly)

Returns the value of attribute protocol.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def protocol
  @protocol
end

#serverObject (readonly)

Returns the value of attribute server.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def server
  @server
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def subscriptions
  @subscriptions
end

#worker_poolObject (readonly)

Returns the value of attribute worker_pool.



55
56
57
# File 'lib/action_cable/connection/base.rb', line 55

def worker_pool
  @worker_pool
end

Instance Method Details

#beatObject



134
135
136
# File 'lib/action_cable/connection/base.rb', line 134

def beat
  transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end

#close(reason: nil, reconnect: true) ⇒ Object

Close the WebSocket connection.



109
110
111
112
113
114
115
116
# File 'lib/action_cable/connection/base.rb', line 109

def close(reason: nil, reconnect: true)
  transmit(
    type: ActionCable::INTERNAL[:message_types][:disconnect],
    reason: reason,
    reconnect: reconnect
  )
  websocket.close
end

#dispatch_websocket_message(websocket_message) ⇒ Object

:nodoc:



90
91
92
93
94
95
96
# File 'lib/action_cable/connection/base.rb', line 90

def dispatch_websocket_message(websocket_message) # :nodoc:
  if websocket.alive?
    handle_channel_command decode(websocket_message)
  else
    logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
  end
end

#handle_channel_command(payload) ⇒ Object



98
99
100
101
102
# File 'lib/action_cable/connection/base.rb', line 98

def handle_channel_command(payload)
  run_callbacks :command do
    subscriptions.execute_command payload
  end
end

#inspectObject

:nodoc:



155
156
157
# File 'lib/action_cable/connection/base.rb', line 155

def inspect # :nodoc:
  "#<#{self.class.name}:#{'%#016x' % (object_id << 1)}>"
end

#on_close(reason, code) ⇒ Object

:nodoc:



151
152
153
# File 'lib/action_cable/connection/base.rb', line 151

def on_close(reason, code) # :nodoc:
  send_async :handle_close
end

#on_error(message) ⇒ Object

:nodoc:



146
147
148
149
# File 'lib/action_cable/connection/base.rb', line 146

def on_error(message) # :nodoc:
  # log errors to make diagnosing socket errors easier
  logger.error "WebSocket error occurred: #{message}"
end

#on_message(message) ⇒ Object

:nodoc:



142
143
144
# File 'lib/action_cable/connection/base.rb', line 142

def on_message(message) # :nodoc:
  message_buffer.append message
end

#on_openObject

:nodoc:



138
139
140
# File 'lib/action_cable/connection/base.rb', line 138

def on_open # :nodoc:
  send_async :handle_open
end

#processObject

Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. This method should not be called directly – instead rely upon on the #connect (and #disconnect) callbacks.



74
75
76
77
78
79
80
81
82
# File 'lib/action_cable/connection/base.rb', line 74

def process # :nodoc:
  logger.info started_request_message

  if websocket.possible? && allow_request_origin?
    respond_to_successful_request
  else
    respond_to_invalid_request
  end
end

#receive(websocket_message) ⇒ Object

Decodes WebSocket messages and dispatches them to subscribed channels. WebSocket message transfer encoding is always JSON.



86
87
88
# File 'lib/action_cable/connection/base.rb', line 86

def receive(websocket_message) # :nodoc:
  send_async :dispatch_websocket_message, websocket_message
end

#send_async(method, *arguments) ⇒ Object

Invoke a method on the connection asynchronously through the pool of thread workers.



119
120
121
# File 'lib/action_cable/connection/base.rb', line 119

def send_async(method, *arguments)
  worker_pool.async_invoke(self, method, *arguments)
end

#statisticsObject

Return a basic hash of statistics for the connection keyed with identifier, started_at, subscriptions, and request_id. This can be returned by a health check against the connection.



125
126
127
128
129
130
131
132
# File 'lib/action_cable/connection/base.rb', line 125

def statistics
  {
    identifier: connection_identifier,
    started_at: @started_at,
    subscriptions: subscriptions.identifiers,
    request_id: @env["action_dispatch.request_id"]
  }
end

#transmit(cable_message) ⇒ Object

:nodoc:



104
105
106
# File 'lib/action_cable/connection/base.rb', line 104

def transmit(cable_message) # :nodoc:
  websocket.transmit encode(cable_message)
end