Module: Relay::Routes::Websocket::Connection
- Included in:
- Relay::Routes::Websocket
- Defined in:
- app/routes/websocket/connection.rb
Instance Method Summary collapse
- #attachment_from_payload(payload) ⇒ Object
- #build_prompt(ctx, message, file) ⇒ Object
-
#dispatch(conn, ctx, payload, params) ⇒ void
Dispatches an incoming websocket payload to the appropriate handler.
-
#fragment(name, locals = nil, **kwargs) ⇒ String
Renders a websocket fragment using the retained fragment state.
-
#on_connect(conn, llm, ctx, params) ⇒ void
Establishes the WebSocket connection and handles incoming messages.
-
#on_message(conn, ctx, payload, params) ⇒ void
Reads an incoming message, sends it to the LLM session, and handles any function calls.
-
#parse_message(message) ⇒ String
Parses an incoming websocket frame from HTMX and extracts the message text.
- #pause(seconds) ⇒ Object
- #request_in_flight? ⇒ Boolean
-
#resolve_functions(ctx, conn, params) ⇒ void
Invokes any pending function calls in the LLM session.
-
#stream(conn, chunk) ⇒ void
Appends a streamed assistant chunk to the last assistant message and re-renders chat.
-
#talk(ctx, prompt, params) ⇒ void
Sends a message to the LLM session.
-
#vars ⇒ Hash
Returns the fragments variables.
-
#wait_with_heartbeat(conn, runner) ⇒ Array<LLM::Function::Return>?
Waits for a runnable to finish while sending websocket heartbeats.
-
#write(conn, message) ⇒ void
Writes an HTML fragment to the websocket as a text frame.
- #yield_tools(ctx) {|tools| ... } ⇒ void
Instance Method Details
#attachment_from_payload(payload) ⇒ Object
225 226 227 228 229 230 231 232 233 |
# File 'app/routes/websocket/connection.rb', line 225 def (payload) path = payload["attachment_path"].to_s return if path.empty? || !File.file?(path) Relay::Attachment.new( name: payload["attachment_name"], path:, type: payload["attachment_type"] ) end |
#build_prompt(ctx, message, file) ⇒ Object
216 217 218 219 220 221 222 223 |
# File 'app/routes/websocket/connection.rb', line 216 def build_prompt(ctx, , file) text = .to_s.strip return text if file.nil? parts = [] parts << text unless text.empty? parts << ctx.local_file(file.path) parts end |
#dispatch(conn, ctx, payload, params) ⇒ void
This method returns an undefined value.
Dispatches an incoming websocket payload to the appropriate handler.
39 40 41 42 43 44 45 46 47 |
# File 'app/routes/websocket/connection.rb', line 39 def dispatch(conn, ctx, payload, params) case when interrupt?(payload) then interrupt!(conn, ctx) when request_in_flight? write(conn, fragment(:status, (status: "Busy", ctx:))) else @task = Async { (conn, ctx, payload, params) } end end |
#fragment(name, locals = nil, **kwargs) ⇒ String
Renders a websocket fragment using the retained fragment state
153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'app/routes/websocket/connection.rb', line 153 def fragment(name, locals = nil, **kwargs) vars.merge!((locals || {}).merge(kwargs)) case name when :append_message then partial("fragments/append_message", locals: vars) when :chat then partial("fragments/stream", locals: vars) when :contexts then partial("fragments/settings/replace_contexts", locals: vars) when :input then partial("fragments/input", locals: {swap_oob: true}) when :remove_empty_state then partial("fragments/remove_empty_state") when :replace_last_message then partial("fragments/replace_last_message", locals: vars) when :status then partial("fragments/status", locals: vars.merge(swap_oob: true)) end end |
#on_connect(conn, llm, ctx, params) ⇒ void
This method returns an undefined value.
Establishes the WebSocket connection and handles incoming messages
16 17 18 19 20 21 22 23 24 25 26 |
# File 'app/routes/websocket/connection.rb', line 16 def on_connect(conn, llm, ctx, params) vars[:messages] = ctx. write(conn, fragment(:status, (ctx:))) while ( = conn.read) dispatch(conn, ctx, (), params) end rescue EOFError, Protocol::WebSocket::ClosedError nil ensure @task = nil end |
#on_message(conn, ctx, payload, params) ⇒ void
This method returns an undefined value.
Reads an incoming message, sends it to the LLM session, and handles any function calls
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'app/routes/websocket/connection.rb', line 72 def (conn, ctx, payload, params) file = (payload) || .consume prompt = build_prompt(ctx, payload["message"], file) return if prompt.empty? vars[:messages].concat [{role: :user, content: prompt}, {role: :assistant, content: +""}] write(conn, fragment(:status, (status: "Thinking...", ctx:))) write(conn, fragment(:remove_empty_state)) if vars[:messages].length == 2 write(conn, fragment(:append_message, message: vars[:messages][-2])) write(conn, fragment(:append_message, message: vars[:messages][-1])) write(conn, fragment(:input)) yield_tools(ctx) do |tools| params[:tools] = tools wait_with_heartbeat(conn, proc { talk(ctx, prompt, params) }) resolve_functions(ctx, conn, params) end write(conn, fragment(:status, (ctx:))) @contexts = nil write(conn, fragment(:contexts, contexts: contexts)) rescue LLM::Interrupt on_interrupt(conn, ctx) rescue LLM::NoSuchRegistryError, LLM::NoSuchModelError write(conn, fragment(:status, (cost: "unknown"))) rescue => e pp e.class, e., e.backtrace write(conn, fragment(:status, (status: "#{e.class}: #{e.}"))) ensure @task = nil end |
#parse_message(message) ⇒ String
Parses an incoming websocket frame from HTMX and extracts the message text
172 173 174 175 176 |
# File 'app/routes/websocket/connection.rb', line 172 def () JSON.parse(.buffer) rescue JSON::ParserError {} end |
#pause(seconds) ⇒ Object
212 213 214 |
# File 'app/routes/websocket/connection.rb', line 212 def pause(seconds) Async::Task.current.sleep(seconds) end |
#request_in_flight? ⇒ Boolean
185 186 187 |
# File 'app/routes/websocket/connection.rb', line 185 def request_in_flight? @task&.alive? end |
#resolve_functions(ctx, conn, params) ⇒ void
This method returns an undefined value.
Invokes any pending function calls in the LLM session
123 124 125 126 127 128 129 130 |
# File 'app/routes/websocket/connection.rb', line 123 def resolve_functions(ctx, conn, params) return if ctx.functions.empty? returns = wait_with_heartbeat(conn, ctx.wait(:task)) wait_with_heartbeat(conn, proc { ctx.talk(returns, params) }) if ctx.functions.any? resolve_functions(ctx, conn, params) end end |
#stream(conn, chunk) ⇒ void
This method returns an undefined value.
Appends a streamed assistant chunk to the last assistant message and re-renders chat
139 140 141 142 143 |
# File 'app/routes/websocket/connection.rb', line 139 def stream(conn, chunk) = vars[:messages].reverse_each.find { _1[:role] == :assistant } [:content] << chunk write conn, fragment(:replace_last_message, message:) end |
#talk(ctx, prompt, params) ⇒ void
This method returns an undefined value.
Sends a message to the LLM session
108 109 110 111 112 113 114 |
# File 'app/routes/websocket/connection.rb', line 108 def talk(ctx, prompt, params) if ctx..empty? ctx.talk initial_prompt(prompt), params else ctx.talk(prompt, params) end end |
#vars ⇒ Hash
Returns the fragments variables
181 182 183 |
# File 'app/routes/websocket/connection.rb', line 181 def vars @temp ||= {messages: []} end |
#wait_with_heartbeat(conn, runner) ⇒ Array<LLM::Function::Return>?
Waits for a runnable to finish while sending websocket heartbeats
197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'app/routes/websocket/connection.rb', line 197 def wait_with_heartbeat(conn, runner) runnable = if Proc === runner Async { runner.call } elsif Array === runner Async { runner } else runner end while runnable.alive? write conn, "<!-- heartbeat -->" pause(0.5) end runnable.wait end |
#write(conn, message) ⇒ void
This method returns an undefined value.
Writes an HTML fragment to the websocket as a text frame
56 57 58 59 60 61 |
# File 'app/routes/websocket/connection.rb', line 56 def write(conn, ) conn.write(Protocol::WebSocket::TextMessage.new(String())) conn.flush rescue Errno::EPIPE, IOError, Protocol::WebSocket::ClosedError nil end |
#yield_tools(ctx) {|tools| ... } ⇒ void
This method returns an undefined value.
239 240 241 242 243 244 245 |
# File 'app/routes/websocket/connection.rb', line 239 def yield_tools(ctx) servers = ctx.mcps servers.each(&:start) yield [*LLM::Tool.registry.reject(&:mcp?), *servers.flat_map(&:tools)] ensure servers&.each { _1.stop rescue nil } end |