Class: Syntropy::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/syntropy/connection.rb

Overview

Implements an HTTP/1.1 connection received by the Syntropy server. This implementation rejects incoming HTTP/0.9 or HTTP/1.0 requests. The response body is sent exclusively using chunked transfer encoding. Request bodies are accepted using either fixed length (Content-Length header) or chunked transfer encoding.

Constant Summary collapse

SEND_FLAGS =
UM::MSG_NOSIGNAL | UM::MSG_WAITALL
EMPTY_CHUNK =
"0\r\n\r\n"
EMPTY_CHUNK_LEN =
EMPTY_CHUNK.bytesize
CHUNKED_ENCODING_POSTLUDE =
"\r\n#{EMPTY_CHUNK}"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, machine, fd, env, &app) ⇒ Connection

Returns a new instance of Connection.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/syntropy/connection.rb', line 17

def initialize(server, machine, fd, env, &app)
  @server = server
  @machine = machine
  @fd = fd
  @env = env
  @logger = env[:logger]
  @io = machine.io(fd, :socket)
  @app = app

  @done = nil
  @response_headers = nil
end

Instance Attribute Details

#fdObject (readonly)

Returns the value of attribute fd.



15
16
17
# File 'lib/syntropy/connection.rb', line 15

def fd
  @fd
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/syntropy/connection.rb', line 15

def logger
  @logger
end

#response_headersObject (readonly)

Returns the value of attribute response_headers.



15
16
17
# File 'lib/syntropy/connection.rb', line 15

def response_headers
  @response_headers
end

Instance Method Details

#closeObject



274
275
276
277
278
279
280
# File 'lib/syntropy/connection.rb', line 274

def close
  return if @closed

  @closed = true
  @machine.shutdown(@fd, UM::SHUT_WR)
  @machine.close_async(@fd)
end

#complete?(req) ⇒ Boolean

Returns:

  • (Boolean)


145
146
147
# File 'lib/syntropy/connection.rb', line 145

def complete?(req)
  req.headers[':body-done-reading']
end

#finish(request) ⇒ void

This method returns an undefined value.

Finishes the response to the current request. If no headers were sent, default headers are sent using #send_headers.



235
236
237
238
239
240
241
242
# File 'lib/syntropy/connection.rb', line 235

def finish(request)
  request.tx_incr(EMPTY_CHUNK_LEN)
  @machine.send(@fd, EMPTY_CHUNK, EMPTY_CHUNK_LEN, SEND_FLAGS)
  return if @done

  @logger&.info(request, request, response_headers: @response_headers)
  @done = true
end

#get_body(req) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/syntropy/connection.rb', line 98

def get_body(req)
  headers = req.headers
  return nil if headers[':body-done-reading']

  content_length = headers['content-length']
  if content_length

    chunk = @io.read(content_length.to_i)
    headers[':body-done-reading'] = true
    return chunk
  end

  chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
  if chunked_encoding
    buf = +''
    while (chunk = read_chunk(headers, nil))
      buf << chunk
    end
    headers[':body-done-reading'] = true
    return buf
  end

  nil
end

#get_body_chunk(req, _buffered_only = false) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/syntropy/connection.rb', line 123

def get_body_chunk(req, _buffered_only = false)
  headers = req.headers
  content_length = headers['content-length']
  if content_length
    return nil if headers[':body-done-reading']

    chunk = @io.read(content_length.to_i)
    headers[':body-done-reading'] = true
    return chunk
  end

  chunked_encoding = headers['transfer-encoding']&.downcase == 'chunked'
  return read_chunk(headers, nil) if chunked_encoding

  return nil if headers[':body-done-reading']

  # if content-length is not specified, we read to EOF, up to max 1MB size
  chunk = read(1 << 20, nil, false)
  headers[':body-done-reading'] = true
  chunk
end

#handle_error(request, err) ⇒ void

This method returns an undefined value.

Handles an error encountered while serving a request by logging the error and optionally sending an error response with the relevant HTTP status code. For I/O errors, no response is sent.

Parameters:



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/syntropy/connection.rb', line 73

def handle_error(request, err)
  case err
  when SystemCallError
    log_error(err, 'I/O error')
    false
  when ProtocolError
    log_error(err, err.message)
    respond(request, err.message, ':status' => err.http_status)
  else
    log_error(err, 'Internal error')
    return if !request || @done

    respond(request, 'Internal server error', ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR)
  end
end

#log_error(err, message) ⇒ void

This method returns an undefined value.

Logs the given err and given message.

Parameters:

  • err (Exception)

    error

  • message (String)

    error message



94
95
96
# File 'lib/syntropy/connection.rb', line 94

def log_error(err, message)
  @logger&.error(message: "#{message}, closing connection", error: err)
end

#monotonic_clockObject



282
283
284
# File 'lib/syntropy/connection.rb', line 282

def monotonic_clock
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end

#respond(request, body, headers) ⇒ Object

Sends response including headers and body. Waits for the request to complete if not yet completed. The body is sent using chunked transfer encoding.

Parameters:

  • request (Qeweney::Request)

    HTTP request

  • body (String)

    response body

  • headers


182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/syntropy/connection.rb', line 182

def respond(request, body, headers)
  headers = @response_headers.merge(headers) if @response_headers

  formatted_headers = format_headers(headers, body)
  @response_headers = headers
  request&.tx_incr(formatted_headers.bytesize + (body ? body.bytesize : 0))
  if body
    chunk_prelude = "#{body.bytesize.to_s(16)}\r\n"
    @machine.sendv(@fd, formatted_headers, chunk_prelude, body, CHUNKED_ENCODING_POSTLUDE)
  else
    @machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
  end
  @logger&.info(request: request, response_headers: headers) if request
  @done = true
end

#respond_with_static_file(req, path, env, cache_headers) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/syntropy/connection.rb', line 244

def respond_with_static_file(req, path, env, cache_headers)
  fd = @machine.open(path, UM::O_RDONLY)
  env ||= {}
  if env[:headers]
    env[:headers].merge!(cache_headers)
  else
    env[:headers] = cache_headers
  end

  maxlen = env[:max_len] || 65_536
  buf = String.new(capacity: maxlen)
  headers_sent = nil
  loop do
    res = @machine.read(fd, buf, maxlen, 0)
    if res < maxlen && !headers_sent
      return respond(req, buf, env[:headers])
    elsif res == 0
      return finish(req)
    end

    if !headers_sent
      send_headers(req, env[:headers])
      headers_sent = true
    end
    done = res < maxlen
    send_chunk(req, buf, done: done)
    return if done
  end
end

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/syntropy/connection.rb', line 30

def run
  loop do
    @done = nil
    @response_headers = nil
    persist = serve_request
    break if !persist
  end
rescue UM::Terminate
  # server is terminated, do nothing
rescue StandardError => e
  @logger&.error(
    message:  'Uncaught error while running connection',
    error:    e
  )
ensure
  @machine.close_async(@fd)
end

#send_chunk(request, chunk, done: false) ⇒ void

This method returns an undefined value.

Sends a response body chunk. If no headers were sent, default headers are sent using #send_headers. if the done option is true(thy), an empty chunk will be sent to signal response completion to the client.

Parameters:

  • request (Qeweney::Request)

    HTTP request

  • chunk (String)

    response body chunk

  • done (boolean) (defaults to: false)

    whether the response is completed



218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/syntropy/connection.rb', line 218

def send_chunk(request, chunk, done: false)
  data = +''
  data << "#{chunk.bytesize.to_s(16)}\r\n#{chunk}\r\n" if chunk
  data << EMPTY_CHUNK if done
  return if data.empty?

  request.tx_incr(data.bytesize)
  @machine.send(@fd, data, data.bytesize, SEND_FLAGS)
  return if @done || !done

  @logger&.info(request: request, response_headers: @response_headers)
  @done = true
end

#send_headers(request, headers, empty_response: false) ⇒ void

This method returns an undefined value.

Sends response headers. If empty_response is truthy, the response status code will default to 204, otherwise to 200.

Parameters:

  • request (Qeweney::Request)

    HTTP request

  • headers (Hash)

    response headers

  • empty_response (boolean) (defaults to: false)

    whether a response body will be sent



204
205
206
207
208
209
# File 'lib/syntropy/connection.rb', line 204

def send_headers(request, headers, empty_response: false)
  formatted_headers = format_headers(headers, !empty_response)
  request.tx_incr(formatted_headers.bytesize)
  @machine.send(@fd, formatted_headers, formatted_headers.bytesize, SEND_FLAGS)
  @response_headers = headers
end

#serve_requestObject

Processes an incoming request by parsing the headers, creating a request object and handing it off to the app handler. Returns true if the connection should be persisted.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/syntropy/connection.rb', line 51

def serve_request
  @closed = nil
  headers = parse_headers
  return false if !headers

  request = Qeweney::Request.new(headers, self)

  request.start_stamp = monotonic_clock
  @app.call(request)
  persist_connection?(headers)
rescue StandardError => e
  handle_error(request, e)
  false
end


161
162
163
164
165
166
167
168
# File 'lib/syntropy/connection.rb', line 161

def set_cookie(*cookies)
  existing_cookies = @response_headers && @response_headers['Set-Cookie']
  if existing_cookies
    @response_headers['Set-Cookie'] = existing_cookies + cookies
  else
    set_response_headers('Set-Cookie' => cookies)
  end
end

#set_response_headers(headers) ⇒ void

This method returns an undefined value.

Sets response headers before sending any response. This method is used to add headers such as Set-Cookie or cache control headers to a response before actually responding, specifically in middleware hooks.

Parameters:

  • headers (Hash)

    response headers



157
158
159
# File 'lib/syntropy/connection.rb', line 157

def set_response_headers(headers)
  @response_headers ? @response_headers.merge!(headers) : @response_headers = headers
end

#with_stream {|@io, @fd| ... } ⇒ Object

Yields:

  • (@io, @fd)


286
287
288
# File 'lib/syntropy/connection.rb', line 286

def with_stream
  yield @io, @fd
end