Class: Async::Background::Queue::SocketWaker

Inherits:
Object
  • Object
show all
Defined in:
lib/async/background/queue/socket_waker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path) ⇒ SocketWaker

Returns a new instance of SocketWaker.



13
14
15
16
17
18
19
# File 'lib/async/background/queue/socket_waker.rb', line 13

def initialize(path)
  @path = path
  @server = nil
  @notification = ::Async::Notification.new
  @running = false
  @accept_task = nil
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



11
12
13
# File 'lib/async/background/queue/socket_waker.rb', line 11

def path
  @path
end

Instance Method Details

#closeObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/async/background/queue/socket_waker.rb', line 66

def close
  @running = false
  if @accept_task && !@accept_task.finished?
    @accept_task.stop rescue nil
  end

  @server&.close rescue nil
  @server = nil
  File.unlink(@path) rescue nil
end

#open!Object



21
22
23
24
25
26
27
28
29
# File 'lib/async/background/queue/socket_waker.rb', line 21

def open!
  cleanup_stale_socket
  ensure_directory
  @server = UNIXServer.new(@path)
  File.chmod(0600, @path)
  @running = true
rescue Errno::EADDRINUSE
  raise "Socket #{@path} is already in use by another process"
end

#signalObject



62
63
64
# File 'lib/async/background/queue/socket_waker.rb', line 62

def signal
  @notification.signal
end

#start_accept_loop(parent_task) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/async/background/queue/socket_waker.rb', line 31

def start_accept_loop(parent_task)
  @accept_task = parent_task.async do |task|
    while @running
      begin
        client = @server.accept_nonblock
        handle_client(task, client)
      rescue IO::WaitReadable
        @server.wait_readable
      rescue Errno::EBADF, IOError
        break
      rescue => e
        Console.logger.error(self) { "SocketWaker accept error: #{e.class} #{e.message}" }
      end
    end
  rescue => e
    Console.logger.error(self) { "SocketWaker loop crashed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}" }
  ensure
    @accept_task = nil
  end
end

#wait(timeout: nil) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/async/background/queue/socket_waker.rb', line 52

def wait(timeout: nil)
  if timeout
    ::Async::Task.current.with_timeout(timeout) { @notification.wait }
  else
    @notification.wait
  end
rescue ::Async::TimeoutError
  # Timeout is normal - listener will fall back to polling
end