Module: Ruflet::ConnectionProtocol

Included in:
Server
Defined in:
lib/ruflet/server/connection_protocol.rb

Overview

Transport-agnostic implementation of the Ruflet wire protocol: one connection loop shared by every server that speaks to Flutter clients.

The standalone TCP server (Ruflet::Server) and host-server adapters such as ruflet_rails’ Rack-hijack endpoint include this module and provide only their transport plus the integration hooks below — the protocol itself is never reimplemented.

Includers must initialize:

@app_block       — proc invoked with the Page on first registration
@sessions        — Hash mapping connection key => Page
@sessions_mutex  — Mutex guarding @sessions

Instance Method Summary collapse

Instance Method Details

#attach_sender(page, ws) ⇒ Object



250
251
252
# File 'lib/ruflet/server/connection_protocol.rb', line 250

def attach_sender(page, ws)
  page.instance_variable_set(:@sender, sender_for(ws))
end

#before_dispatch_event(ws, event) ⇒ Object

Called before a control event is dispatched to the Page.



40
# File 'lib/ruflet/server/connection_protocol.rb', line 40

def before_dispatch_event(ws, event); end

#close_connection(ws) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/ruflet/server/connection_protocol.rb', line 72

def close_connection(ws)
  return unless ws

  remove_session(ws)
  connection_closed(ws)
  ws.close
end

#connection_closed(ws) ⇒ Object

Called when a connection leaves the protocol loop.



25
# File 'lib/ruflet/server/connection_protocol.rb', line 25

def connection_closed(ws); end

#connection_opened(ws) ⇒ Object

Called when a connection enters the protocol loop.



22
# File 'lib/ruflet/server/connection_protocol.rb', line 22

def connection_opened(ws); end

#decode_incoming(raw) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ruflet/server/connection_protocol.rb', line 104

def decode_incoming(raw)
  parsed = normalize_incoming(Ruflet::WireCodec.unpack(raw.to_s.b))

  if parsed.is_a?(Array) && parsed.length >= 2
    return [parsed[0], parsed[1]]
  end

  if parsed.is_a?(Hash)
    action = parsed["action"] || parsed[:action]
    payload = parsed["payload"] || parsed[:payload]
    return [action, payload] unless action.nil?

    if (parsed.key?("target") || parsed.key?(:target)) && (parsed.key?("name") || parsed.key?(:name))
      return [Protocol::ACTIONS[:control_event], parsed]
    end
  end

  raise "Unsupported payload format"
end

#disconnect_error?(error) ⇒ Boolean

Returns:

  • (Boolean)


260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/ruflet/server/connection_protocol.rb', line 260

def disconnect_error?(error)
  return true if error.is_a?(IOError)
  return true if error.is_a?(Errno::EPIPE)
  return true if error.is_a?(Errno::ECONNRESET)
  return true if error.is_a?(Errno::ECONNABORTED)
  return true if error.is_a?(Errno::ENOTCONN)
  return true if error.is_a?(Errno::ESHUTDOWN)
  return true if error.is_a?(Errno::EBADF)
  return true if error.is_a?(Errno::EINVAL)

  false
end

#fetch_page(ws) ⇒ Object



207
208
209
210
211
212
# File 'lib/ruflet/server/connection_protocol.rb', line 207

def fetch_page(ws)
  page = @sessions_mutex.synchronize { @sessions[ws.session_key] }
  raise "Session not found" unless page

  page
end

#handle_message(ws, raw) ⇒ Object


Protocol core.




84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ruflet/server/connection_protocol.rb', line 84

def handle_message(ws, raw)
  action, payload = decode_incoming(raw)
  payload ||= {}

  warn "incoming action=#{action.inspect}" if ENV["RUFLET_DEBUG"] == "1"

  case action
  when Protocol::ACTIONS[:register_client], Protocol::ACTIONS[:register_web_client]
    on_register_client(ws, payload)
  when Protocol::ACTIONS[:control_event], Protocol::ACTIONS[:page_event_from_web]
    on_control_event(ws, payload)
  when Protocol::ACTIONS[:update_control], Protocol::ACTIONS[:update_control_props]
    on_update_control(ws, payload)
  when Protocol::ACTIONS[:invoke_control_method]
    on_invoke_control_method(ws, payload)
  else
    raise "Unknown action: #{action.inspect}"
  end
end

#handle_upgraded_socket(io) ⇒ Object

For hosts that already performed the HTTP upgrade (Rack hijack, the embedded runtime, tests with socket pairs).



53
54
55
# File 'lib/ruflet/server/connection_protocol.rb', line 53

def handle_upgraded_socket(io)
  run_connection(Ruflet::WebSocketConnection.new(io))
end

#log_connection_error(error) ⇒ Object



42
43
44
45
# File 'lib/ruflet/server/connection_protocol.rb', line 42

def log_connection_error(error)
  warn "server error: #{error.class}: #{error.message}"
  warn error.backtrace.join("\n") if error.backtrace
end

#normalize_event_data(value) ⇒ Object



222
223
224
225
226
227
228
229
230
231
# File 'lib/ruflet/server/connection_protocol.rb', line 222

def normalize_event_data(value)
  case value
  when Hash
    value.each_with_object({}) { |(k, v), out| out[k.to_sym] = normalize_event_data(v) }
  when Array
    value.map { |entry| normalize_event_data(entry) }
  else
    value
  end
end

#normalize_incoming(value) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ruflet/server/connection_protocol.rb', line 124

def normalize_incoming(value)
  case value
  when String
    value.dup.force_encoding("UTF-8")
  when Integer, Float, TrueClass, FalseClass, NilClass
    value
  when Symbol
    value.to_s
  when Array
    value.map { |v| normalize_incoming(v) }
  when Hash
    value.each_with_object({}) do |(k, v), out|
      out[k.to_s] = normalize_incoming(v)
    end
  else
    value.to_s
  end
end

#on_control_event(ws, payload) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/ruflet/server/connection_protocol.rb', line 178

def on_control_event(ws, payload)
  event = Protocol.normalize_control_event_payload(payload)
  page = fetch_page(ws)
  return if event["target"].nil? || event["name"].to_s.empty?

  attach_sender(page, ws)
  before_dispatch_event(ws, event)
  page.dispatch_event(
    target: event["target"],
    name: event["name"],
    data: normalize_event_data(event["data"])
  )
end

#on_invoke_control_method(ws, payload) ⇒ Object



201
202
203
204
205
# File 'lib/ruflet/server/connection_protocol.rb', line 201

def on_invoke_control_method(ws, payload)
  page = fetch_page(ws)
  attach_sender(page, ws)
  page.handle_invoke_method_result(Protocol.normalize_invoke_method_result_payload(payload))
end

#on_register_client(ws, payload) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/ruflet/server/connection_protocol.rb', line 143

def on_register_client(ws, payload)
  normalized = Protocol.normalize_register_payload(payload)
  session_id = normalized["session_id"].to_s.empty? ? pseudo_uuid : normalized["session_id"]

  page = resume_session(session_id)
  first_registration = page.nil?

  if page
    attach_sender(page, ws)
    reset_mount_state(page)
  else
    page = Page.new(
      session_id: session_id,
      client_details: normalized,
      sender: sender_for(ws)
    )
    page.title = "Ruflet App"
  end

  @sessions_mutex.synchronize { @sessions[ws.session_key] = page }
  session_stored(page, ws)

  initial_response = [
    Protocol::ACTIONS[:register_client],
    Protocol.register_response(session_id: session_id)
  ]
  ws.send_binary(Ruflet::WireCodec.pack(initial_response))

  @app_block.call(page) if first_registration
  page.update
rescue StandardError => e
  send_message(ws, Protocol::ACTIONS[:session_crashed], { "message" => e.message.to_s })
  raise
end

#on_update_control(ws, payload) ⇒ Object



192
193
194
195
196
197
198
199
# File 'lib/ruflet/server/connection_protocol.rb', line 192

def on_update_control(ws, payload)
  update = Protocol.normalize_update_control_payload(payload)
  page = fetch_page(ws)
  return if update["id"].nil?

  attach_sender(page, ws)
  page.apply_client_update(update["id"], update["props"] || {})
end

#pseudo_uuidObject



273
274
275
276
277
278
279
280
281
282
283
# File 'lib/ruflet/server/connection_protocol.rb', line 273

def pseudo_uuid
  now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
  rnd = rand(0..0xffff_ffff)
  "%08x-%04x-%04x-%04x-%012x" % [
    rnd,
    now & 0xffff,
    (now >> 16) & 0xffff,
    (now >> 32) & 0xffff,
    (now >> 48) & 0xffff_ffff_ffff
  ]
end

#remove_session(ws) ⇒ Object



214
215
216
217
218
219
220
# File 'lib/ruflet/server/connection_protocol.rb', line 214

def remove_session(ws)
  return unless ws

  page = @sessions_mutex.synchronize { @sessions.delete(ws.session_key) }
  session_removed(page, ws) if page
  page
end

#reset_mount_state(page) ⇒ Object



254
255
256
257
258
# File 'lib/ruflet/server/connection_protocol.rb', line 254

def reset_mount_state(page)
  page.instance_variable_set(:@overlay_container_mounted, false)
  page.instance_variable_set(:@dialogs_container_mounted, false)
  page.instance_variable_set(:@services_container_mounted, false)
end

#resume_session(_session_id) ⇒ Object

Return an existing Page to resume for this session id, or nil to create a fresh one (hosts with a session registry override this).



29
30
31
# File 'lib/ruflet/server/connection_protocol.rb', line 29

def resume_session(_session_id)
  nil
end

#run_connection(ws) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/ruflet/server/connection_protocol.rb', line 57

def run_connection(ws)
  connection_opened(ws)

  while (raw = ws.read_message)
    handle_message(ws, raw)
  end
rescue StandardError => e
  return if disconnect_error?(e)

  log_connection_error(e)
  send_message(ws, Protocol::ACTIONS[:session_crashed], { "message" => e.message.to_s.dup.force_encoding("UTF-8") })
ensure
  close_connection(ws)
end

#send_message(ws, action, payload) ⇒ Object



233
234
235
236
237
238
239
240
241
242
# File 'lib/ruflet/server/connection_protocol.rb', line 233

def send_message(ws, action, payload)
  return if ws.nil? || ws.closed?

  ws.send_binary(Ruflet::WireCodec.pack([action, payload]))
rescue StandardError => e
  log_connection_error(e) unless disconnect_error?(e)
  remove_session(ws)
  connection_closed(ws)
  nil
end

#sender_for(ws) ⇒ Object



244
245
246
247
248
# File 'lib/ruflet/server/connection_protocol.rb', line 244

def sender_for(ws)
  lambda do |action, msg_payload|
    send_message(ws, action, msg_payload)
  end
end

#session_removed(page, ws) ⇒ Object

Called after a Page is removed for a connection.



37
# File 'lib/ruflet/server/connection_protocol.rb', line 37

def session_removed(page, ws); end

#session_stored(page, ws) ⇒ Object

Called after a Page is stored for a connection.



34
# File 'lib/ruflet/server/connection_protocol.rb', line 34

def session_stored(page, ws); end