Class: Cougar::RactorWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/cougar/ractor_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_id, server, app) ⇒ RactorWorker

Returns a new instance of RactorWorker.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/cougar/ractor_worker.rb', line 7

def initialize(worker_id, server, app)
  @worker_id = worker_id
  control_port_port = Ractor::Port.new
  
  @ractor = Ractor.new(server, app, worker_id, control_port_port) do |server, app, worker_id, control_port_port|
    control_port = Ractor::Port.new
    worker_thread = Thread.current
    control_thread = Thread.new do |th|
      loop do
        case control_port.receive
        in :stop
          server.close
          break
        in [:status, response_port]
          response_port.send({
            worker_id: worker_id,
            thread_status: worker_thread.status,
            alive: worker_thread.alive?
          })
        end
      end
    end
    control_port_port.send(control_port)

    loop do
      begin
        client = server.accept
        client.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
      rescue IOError, Errno::EBADF => e
        # Server socket was closed, exit gracefully
        break
      end

      keepalive = true
      request = Cougar::Request.new(client)
      first_request = true
      while keepalive
        begin
          unless first_request || request.has_buffered_data?
            unless client.wait_readable(0.1)
              break
            end
          end
          first_request = false

          # Parse the HTTP request
          if !request.parse
            break
          end

          # Call the Rack app
          status, headers, body = app.call(request.env)

          # Send the response
          keepalive = request.respond(status, headers, body)

          client.flush

          break unless keepalive

          request.reset_for_next_request
        rescue => e
          $stderr.puts "[Worker #{worker_id}] #{e.class}: #{e.message}"
          $stderr.puts e.backtrace
          keepalive = false
        end
      end
    ensure
      client&.close
    end

    control_thread.join
  end
  
  @control_port = control_port_port.receive
  control_port_port.close
end

Instance Attribute Details

#ractorObject (readonly)

Returns the value of attribute ractor.



5
6
7
# File 'lib/cougar/ractor_worker.rb', line 5

def ractor
  @ractor
end

#worker_idObject (readonly)

Returns the value of attribute worker_id.



5
6
7
# File 'lib/cougar/ractor_worker.rb', line 5

def worker_id
  @worker_id
end

Instance Method Details

#joinObject



93
94
95
# File 'lib/cougar/ractor_worker.rb', line 93

def join
  @ractor.join
end

#send_status_request(response_port) ⇒ Object



89
90
91
# File 'lib/cougar/ractor_worker.rb', line 89

def send_status_request(response_port)
  @control_port.send([:status, response_port])
end

#stopObject



85
86
87
# File 'lib/cougar/ractor_worker.rb', line 85

def stop
  @control_port.send(:stop)
end