Module: Ruflet::ConnectionProtocol
- Included in:
- Server
- Defined in:
- lib/ruflet/server/connection_protocol.rb
Overview
Transport-agnostic implementation of the Ruflet wire protocol: one connection loop shared by every server that speaks to Flutter clients.
The standalone TCP server (Ruflet::Server) and host-server adapters such as ruflet_rails’ Rack-hijack endpoint include this module and provide only their transport plus the integration hooks below — the protocol itself is never reimplemented.
Includers must initialize:
@app_block — proc invoked with the Page on first registration
@sessions — Hash mapping connection key => Page
@sessions_mutex — Mutex guarding @sessions
Instance Method Summary collapse
- #attach_sender(page, ws) ⇒ Object
-
#before_dispatch_event(ws, event) ⇒ Object
Called before a control event is dispatched to the Page.
- #close_connection(ws) ⇒ Object
-
#connection_closed(ws) ⇒ Object
Called when a connection leaves the protocol loop.
-
#connection_opened(ws) ⇒ Object
Called when a connection enters the protocol loop.
- #decode_incoming(raw) ⇒ Object
- #disconnect_error?(error) ⇒ Boolean
- #fetch_page(ws) ⇒ Object
-
#handle_message(ws, raw) ⇒ Object
—————————————————————— Protocol core.
-
#handle_upgraded_socket(io) ⇒ Object
For hosts that already performed the HTTP upgrade (Rack hijack, the embedded runtime, tests with socket pairs).
- #log_connection_error(error) ⇒ Object
- #normalize_event_data(value) ⇒ Object
- #normalize_incoming(value) ⇒ Object
- #on_control_event(ws, payload) ⇒ Object
- #on_invoke_control_method(ws, payload) ⇒ Object
- #on_register_client(ws, payload) ⇒ Object
- #on_update_control(ws, payload) ⇒ Object
- #pseudo_uuid ⇒ Object
- #remove_session(ws) ⇒ Object
- #reset_mount_state(page) ⇒ Object
-
#resume_session(_session_id) ⇒ Object
Return an existing Page to resume for this session id, or nil to create a fresh one (hosts with a session registry override this).
- #run_connection(ws) ⇒ Object
- #send_message(ws, action, payload) ⇒ Object
- #sender_for(ws) ⇒ Object
-
#session_removed(page, ws) ⇒ Object
Called after a Page is removed for a connection.
-
#session_stored(page, ws) ⇒ Object
Called after a Page is stored for a connection.
Instance Method Details
#attach_sender(page, ws) ⇒ Object
250 251 252 |
# File 'lib/ruflet/server/connection_protocol.rb', line 250 def attach_sender(page, ws) page.instance_variable_set(:@sender, sender_for(ws)) end |
#before_dispatch_event(ws, event) ⇒ Object
Called before a control event is dispatched to the Page.
40 |
# File 'lib/ruflet/server/connection_protocol.rb', line 40 def before_dispatch_event(ws, event); end |
#close_connection(ws) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/ruflet/server/connection_protocol.rb', line 72 def close_connection(ws) return unless ws remove_session(ws) connection_closed(ws) ws.close end |
#connection_closed(ws) ⇒ Object
Called when a connection leaves the protocol loop.
25 |
# File 'lib/ruflet/server/connection_protocol.rb', line 25 def connection_closed(ws); end |
#connection_opened(ws) ⇒ Object
Called when a connection enters the protocol loop.
22 |
# File 'lib/ruflet/server/connection_protocol.rb', line 22 def connection_opened(ws); end |
#decode_incoming(raw) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/ruflet/server/connection_protocol.rb', line 104 def decode_incoming(raw) parsed = normalize_incoming(Ruflet::WireCodec.unpack(raw.to_s.b)) if parsed.is_a?(Array) && parsed.length >= 2 return [parsed[0], parsed[1]] end if parsed.is_a?(Hash) action = parsed["action"] || parsed[:action] payload = parsed["payload"] || parsed[:payload] return [action, payload] unless action.nil? if (parsed.key?("target") || parsed.key?(:target)) && (parsed.key?("name") || parsed.key?(:name)) return [Protocol::ACTIONS[:control_event], parsed] end end raise "Unsupported payload format" end |
#disconnect_error?(error) ⇒ Boolean
260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/ruflet/server/connection_protocol.rb', line 260 def disconnect_error?(error) return true if error.is_a?(IOError) return true if error.is_a?(Errno::EPIPE) return true if error.is_a?(Errno::ECONNRESET) return true if error.is_a?(Errno::ECONNABORTED) return true if error.is_a?(Errno::ENOTCONN) return true if error.is_a?(Errno::ESHUTDOWN) return true if error.is_a?(Errno::EBADF) return true if error.is_a?(Errno::EINVAL) false end |
#fetch_page(ws) ⇒ Object
207 208 209 210 211 212 |
# File 'lib/ruflet/server/connection_protocol.rb', line 207 def fetch_page(ws) page = @sessions_mutex.synchronize { @sessions[ws.session_key] } raise "Session not found" unless page page end |
#handle_message(ws, raw) ⇒ Object
Protocol core.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ruflet/server/connection_protocol.rb', line 84 def (ws, raw) action, payload = decode_incoming(raw) payload ||= {} warn "incoming action=#{action.inspect}" if ENV["RUFLET_DEBUG"] == "1" case action when Protocol::ACTIONS[:register_client], Protocol::ACTIONS[:register_web_client] on_register_client(ws, payload) when Protocol::ACTIONS[:control_event], Protocol::ACTIONS[:page_event_from_web] on_control_event(ws, payload) when Protocol::ACTIONS[:update_control], Protocol::ACTIONS[:update_control_props] on_update_control(ws, payload) when Protocol::ACTIONS[:invoke_control_method] on_invoke_control_method(ws, payload) else raise "Unknown action: #{action.inspect}" end end |
#handle_upgraded_socket(io) ⇒ Object
For hosts that already performed the HTTP upgrade (Rack hijack, the embedded runtime, tests with socket pairs).
53 54 55 |
# File 'lib/ruflet/server/connection_protocol.rb', line 53 def handle_upgraded_socket(io) run_connection(Ruflet::WebSocketConnection.new(io)) end |
#log_connection_error(error) ⇒ Object
42 43 44 45 |
# File 'lib/ruflet/server/connection_protocol.rb', line 42 def log_connection_error(error) warn "server error: #{error.class}: #{error.}" warn error.backtrace.join("\n") if error.backtrace end |
#normalize_event_data(value) ⇒ Object
222 223 224 225 226 227 228 229 230 231 |
# File 'lib/ruflet/server/connection_protocol.rb', line 222 def normalize_event_data(value) case value when Hash value.each_with_object({}) { |(k, v), out| out[k.to_sym] = normalize_event_data(v) } when Array value.map { |entry| normalize_event_data(entry) } else value end end |
#normalize_incoming(value) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/ruflet/server/connection_protocol.rb', line 124 def normalize_incoming(value) case value when String value.dup.force_encoding("UTF-8") when Integer, Float, TrueClass, FalseClass, NilClass value when Symbol value.to_s when Array value.map { |v| normalize_incoming(v) } when Hash value.each_with_object({}) do |(k, v), out| out[k.to_s] = normalize_incoming(v) end else value.to_s end end |
#on_control_event(ws, payload) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/ruflet/server/connection_protocol.rb', line 178 def on_control_event(ws, payload) event = Protocol.normalize_control_event_payload(payload) page = fetch_page(ws) return if event["target"].nil? || event["name"].to_s.empty? attach_sender(page, ws) before_dispatch_event(ws, event) page.dispatch_event( target: event["target"], name: event["name"], data: normalize_event_data(event["data"]) ) end |
#on_invoke_control_method(ws, payload) ⇒ Object
201 202 203 204 205 |
# File 'lib/ruflet/server/connection_protocol.rb', line 201 def on_invoke_control_method(ws, payload) page = fetch_page(ws) attach_sender(page, ws) page.handle_invoke_method_result(Protocol.normalize_invoke_method_result_payload(payload)) end |
#on_register_client(ws, payload) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/ruflet/server/connection_protocol.rb', line 143 def on_register_client(ws, payload) normalized = Protocol.normalize_register_payload(payload) session_id = normalized["session_id"].to_s.empty? ? pseudo_uuid : normalized["session_id"] page = resume_session(session_id) first_registration = page.nil? if page attach_sender(page, ws) reset_mount_state(page) else page = Page.new( session_id: session_id, client_details: normalized, sender: sender_for(ws) ) page.title = "Ruflet App" end @sessions_mutex.synchronize { @sessions[ws.session_key] = page } session_stored(page, ws) initial_response = [ Protocol::ACTIONS[:register_client], Protocol.register_response(session_id: session_id) ] ws.send_binary(Ruflet::WireCodec.pack(initial_response)) @app_block.call(page) if first_registration page.update rescue StandardError => e (ws, Protocol::ACTIONS[:session_crashed], { "message" => e..to_s }) raise end |
#on_update_control(ws, payload) ⇒ Object
192 193 194 195 196 197 198 199 |
# File 'lib/ruflet/server/connection_protocol.rb', line 192 def on_update_control(ws, payload) update = Protocol.normalize_update_control_payload(payload) page = fetch_page(ws) return if update["id"].nil? attach_sender(page, ws) page.apply_client_update(update["id"], update["props"] || {}) end |
#pseudo_uuid ⇒ Object
273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/ruflet/server/connection_protocol.rb', line 273 def pseudo_uuid now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) rnd = rand(0..0xffff_ffff) "%08x-%04x-%04x-%04x-%012x" % [ rnd, now & 0xffff, (now >> 16) & 0xffff, (now >> 32) & 0xffff, (now >> 48) & 0xffff_ffff_ffff ] end |
#remove_session(ws) ⇒ Object
214 215 216 217 218 219 220 |
# File 'lib/ruflet/server/connection_protocol.rb', line 214 def remove_session(ws) return unless ws page = @sessions_mutex.synchronize { @sessions.delete(ws.session_key) } session_removed(page, ws) if page page end |
#reset_mount_state(page) ⇒ Object
254 255 256 257 258 |
# File 'lib/ruflet/server/connection_protocol.rb', line 254 def reset_mount_state(page) page.instance_variable_set(:@overlay_container_mounted, false) page.instance_variable_set(:@dialogs_container_mounted, false) page.instance_variable_set(:@services_container_mounted, false) end |
#resume_session(_session_id) ⇒ Object
Return an existing Page to resume for this session id, or nil to create a fresh one (hosts with a session registry override this).
29 30 31 |
# File 'lib/ruflet/server/connection_protocol.rb', line 29 def resume_session(_session_id) nil end |
#run_connection(ws) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ruflet/server/connection_protocol.rb', line 57 def run_connection(ws) connection_opened(ws) while (raw = ws.) (ws, raw) end rescue StandardError => e return if disconnect_error?(e) log_connection_error(e) (ws, Protocol::ACTIONS[:session_crashed], { "message" => e..to_s.dup.force_encoding("UTF-8") }) ensure close_connection(ws) end |
#send_message(ws, action, payload) ⇒ Object
233 234 235 236 237 238 239 240 241 242 |
# File 'lib/ruflet/server/connection_protocol.rb', line 233 def (ws, action, payload) return if ws.nil? || ws.closed? ws.send_binary(Ruflet::WireCodec.pack([action, payload])) rescue StandardError => e log_connection_error(e) unless disconnect_error?(e) remove_session(ws) connection_closed(ws) nil end |
#sender_for(ws) ⇒ Object
244 245 246 247 248 |
# File 'lib/ruflet/server/connection_protocol.rb', line 244 def sender_for(ws) lambda do |action, msg_payload| (ws, action, msg_payload) end end |
#session_removed(page, ws) ⇒ Object
Called after a Page is removed for a connection.
37 |
# File 'lib/ruflet/server/connection_protocol.rb', line 37 def session_removed(page, ws); end |
#session_stored(page, ws) ⇒ Object
Called after a Page is stored for a connection.
34 |
# File 'lib/ruflet/server/connection_protocol.rb', line 34 def session_stored(page, ws); end |