Module: Relay::Routes::Websocket::Connection

Included in:
Relay::Routes::Websocket
Defined in:
app/routes/websocket/connection.rb

Instance Method Summary collapse

Instance Method Details

#attachment_from_payload(payload) ⇒ Object



225
226
227
228
229
230
231
232
233
# File 'app/routes/websocket/connection.rb', line 225

def attachment_from_payload(payload)
  path = payload["attachment_path"].to_s
  return if path.empty? || !File.file?(path)
  Relay::Attachment.new(
    name: payload["attachment_name"],
    path:,
    type: payload["attachment_type"]
  )
end

#build_prompt(ctx, message, file) ⇒ Object



216
217
218
219
220
221
222
223
# File 'app/routes/websocket/connection.rb', line 216

def build_prompt(ctx, message, file)
  text = message.to_s.strip
  return text if file.nil?
  parts = []
  parts << text unless text.empty?
  parts << ctx.local_file(file.path)
  parts
end

#dispatch(conn, ctx, payload, params) ⇒ void

This method returns an undefined value.

Dispatches an incoming websocket payload to the appropriate handler.

Parameters:

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

  • ctx (Relay::Models::Context)

    The current context

  • payload (Hash)

    The parsed websocket payload

  • params (Hash)

    The mutable request params for the current turn



39
40
41
42
43
44
45
46
47
# File 'app/routes/websocket/connection.rb', line 39

def dispatch(conn, ctx, payload, params)
  case
  when interrupt?(payload) then interrupt!(conn, ctx)
  when request_in_flight?
    write(conn, fragment(:status, status_bar(status: "Busy", ctx:)))
  else
    @task = Async { on_message(conn, ctx, payload, params) }
  end
end

#fragment(name, locals = nil, **kwargs) ⇒ String

Renders a websocket fragment using the retained fragment state

Parameters:

  • name (Symbol)

    The fragment name

  • locals (Hash) (defaults to: nil)

    The local values to merge into the retained fragment state

Returns:

  • (String)

    The rendered HTML fragment



153
154
155
156
157
158
159
160
161
162
163
164
# File 'app/routes/websocket/connection.rb', line 153

def fragment(name, locals = nil, **kwargs)
  vars.merge!((locals || {}).merge(kwargs))
  case name
  when :append_message then partial("fragments/append_message", locals: vars)
  when :chat then partial("fragments/stream", locals: vars)
  when :contexts then partial("fragments/settings/replace_contexts", locals: vars)
  when :input then partial("fragments/input", locals: {swap_oob: true})
  when :remove_empty_state then partial("fragments/remove_empty_state")
  when :replace_last_message then partial("fragments/replace_last_message", locals: vars)
  when :status then partial("fragments/status", locals: vars.merge(swap_oob: true))
  end
end

#on_connect(conn, llm, ctx, params) ⇒ void

This method returns an undefined value.

Establishes the WebSocket connection and handles incoming messages

Parameters:

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

  • llm (LLM::Provider)

    The selected LLM provider

  • ctx (Relay::Models::Context)

    The current context



16
17
18
19
20
21
22
23
24
25
26
# File 'app/routes/websocket/connection.rb', line 16

def on_connect(conn, llm, ctx, params)
  vars[:messages] = ctx.messages
  write(conn, fragment(:status, status_bar(ctx:)))
  while (message = conn.read)
    dispatch(conn, ctx, parse_message(message), params)
  end
rescue EOFError, Protocol::WebSocket::ClosedError
  nil
ensure
  @task = nil
end

#on_message(conn, ctx, payload, params) ⇒ void

This method returns an undefined value.

Reads an incoming message, sends it to the LLM session, and handles any function calls

Parameters:

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

  • ctx (Relay::Models::Context)

    The current context

  • message (String)

    The incoming message



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'app/routes/websocket/connection.rb', line 72

def on_message(conn, ctx, payload, params)
  file = attachment_from_payload(payload) || attachment.consume
  prompt = build_prompt(ctx, payload["message"], file)
  return if prompt.empty?
  vars[:messages].concat [{role: :user, content: prompt}, {role: :assistant, content: +""}]
  write(conn, fragment(:status, status_bar(status: "Thinking...", ctx:)))
  write(conn, fragment(:remove_empty_state)) if vars[:messages].length == 2
  write(conn, fragment(:append_message, message: vars[:messages][-2]))
  write(conn, fragment(:append_message, message: vars[:messages][-1]))
  write(conn, fragment(:input))
  yield_tools(ctx) do |tools|
    params[:tools] = tools
    wait_with_heartbeat(conn, proc { talk(ctx, prompt, params) })
    resolve_functions(ctx, conn, params)
  end
  write(conn, fragment(:status, status_bar(ctx:)))
  @contexts = nil
  write(conn, fragment(:contexts, contexts: contexts))
rescue LLM::Interrupt
  on_interrupt(conn, ctx)
rescue LLM::NoSuchRegistryError, LLM::NoSuchModelError
  write(conn, fragment(:status, status_bar(cost: "unknown")))
rescue => e
  pp e.class, e.message, e.backtrace
  write(conn, fragment(:status, status_bar(status: "#{e.class}: #{e.message}")))
ensure
  @task = nil
end

#parse_message(message) ⇒ String

Parses an incoming websocket frame from HTMX and extracts the message text

Parameters:

  • message (Protocol::WebSocket::Message)

    The websocket message frame

Returns:

  • (String)

    The message text, or an empty string if parsing fails



172
173
174
175
176
# File 'app/routes/websocket/connection.rb', line 172

def parse_message(message)
  JSON.parse(message.buffer)
rescue JSON::ParserError
  {}
end

#pause(seconds) ⇒ Object



212
213
214
# File 'app/routes/websocket/connection.rb', line 212

def pause(seconds)
  Async::Task.current.sleep(seconds)
end

#request_in_flight?Boolean

Returns:

  • (Boolean)


185
186
187
# File 'app/routes/websocket/connection.rb', line 185

def request_in_flight?
  @task&.alive?
end

#resolve_functions(ctx, conn, params) ⇒ void

This method returns an undefined value.

Invokes any pending function calls in the LLM session

Parameters:

  • ctx (Relay::Models::Context)

    The current context

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object



123
124
125
126
127
128
129
130
# File 'app/routes/websocket/connection.rb', line 123

def resolve_functions(ctx, conn, params)
  return if ctx.functions.empty?
  returns = wait_with_heartbeat(conn, ctx.wait(:task))
  wait_with_heartbeat(conn, proc { ctx.talk(returns, params) })
  if ctx.functions.any?
    resolve_functions(ctx, conn, params)
  end
end

#stream(conn, chunk) ⇒ void

This method returns an undefined value.

Appends a streamed assistant chunk to the last assistant message and re-renders chat

Parameters:

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

  • chunk (String)

    The streamed assistant text chunk



139
140
141
142
143
# File 'app/routes/websocket/connection.rb', line 139

def stream(conn, chunk)
  message = vars[:messages].reverse_each.find { _1[:role] == :assistant }
  message[:content] << chunk
  write conn, fragment(:replace_last_message, message:)
end

#talk(ctx, prompt, params) ⇒ void

This method returns an undefined value.

Sends a message to the LLM session

Parameters:



108
109
110
111
112
113
114
# File 'app/routes/websocket/connection.rb', line 108

def talk(ctx, prompt, params)
  if ctx.messages.empty?
    ctx.talk initial_prompt(prompt), params
  else
    ctx.talk(prompt, params)
  end
end

#varsHash

Returns the fragments variables

Returns:

  • (Hash)


181
182
183
# File 'app/routes/websocket/connection.rb', line 181

def vars
  @temp ||= {messages: []}
end

#wait_with_heartbeat(conn, runner) ⇒ Array<LLM::Function::Return>?

Waits for a runnable to finish while sending websocket heartbeats

Parameters:

  • runnable (LLM::Function::ThreadGroup, Proc)

    The runnable value to wait for

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

Returns:

  • (Array<LLM::Function::Return>, nil)

    Returns thread-group values, or nil for proc work



197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'app/routes/websocket/connection.rb', line 197

def wait_with_heartbeat(conn, runner)
  runnable = if Proc === runner
    Async { runner.call }
  elsif Array === runner
    Async { runner }
  else
    runner
  end
  while runnable.alive?
    write conn, "<!-- heartbeat -->"
    pause(0.5)
  end
  runnable.wait
end

#write(conn, message) ⇒ void

This method returns an undefined value.

Writes an HTML fragment to the websocket as a text frame

Parameters:

  • conn (Async::WebSocket::Adapters::Rack)

    The WebSocket connection object

  • message (String)

    The rendered HTML fragment



56
57
58
59
60
61
# File 'app/routes/websocket/connection.rb', line 56

def write(conn, message)
  conn.write(Protocol::WebSocket::TextMessage.new(String(message)))
  conn.flush
rescue Errno::EPIPE, IOError, Protocol::WebSocket::ClosedError
  nil
end

#yield_tools(ctx) {|tools| ... } ⇒ void

This method returns an undefined value.

Parameters:

Yield Parameters:

  • tools (Array<LLM::Tool>)


239
240
241
242
243
244
245
# File 'app/routes/websocket/connection.rb', line 239

def yield_tools(ctx)
  servers = ctx.mcps
  servers.each(&:start)
  yield [*LLM::Tool.registry.reject(&:mcp?), *servers.flat_map(&:tools)]
ensure
  servers&.each { _1.stop rescue nil }
end