Class: Async::Background::Queue::Notifier

Inherits:
Object
  • Object
show all
Defined in:
lib/async/background/queue/notifier.rb

Constant Summary collapse

WRITE_DROPPED =

Error groups

[
  IO::WaitWritable,  # buffer full — consumer is behind, skip
  Errno::EAGAIN,     # same as above on some platforms
  IOError,           # our own writer end has been closed
  Errno::EPIPE       # the reader end is gone (consumer crashed) — skip
].freeze
READ_EXHAUSTED =
[
  IO::WaitReadable,  # nothing left in the buffer — normal exit
  EOFError,          # writer end closed — no more data ever
  IOError            # our own reader end has been closed
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeNotifier

Returns a new instance of Notifier.



23
24
25
26
27
# File 'lib/async/background/queue/notifier.rb', line 23

def initialize
  @reader, @writer = IO.pipe
  @reader.binmode
  @writer.binmode
end

Instance Attribute Details

#readerObject (readonly)

Returns the value of attribute reader.



21
22
23
# File 'lib/async/background/queue/notifier.rb', line 21

def reader
  @reader
end

#writerObject (readonly)

Returns the value of attribute writer.



21
22
23
# File 'lib/async/background/queue/notifier.rb', line 21

def writer
  @writer
end

Instance Method Details

#closeObject



52
53
54
55
# File 'lib/async/background/queue/notifier.rb', line 52

def close
  close_reader
  close_writer
end

#close_readerObject



48
49
50
# File 'lib/async/background/queue/notifier.rb', line 48

def close_reader
  @reader.close unless @reader.closed?
end

#close_writerObject



44
45
46
# File 'lib/async/background/queue/notifier.rb', line 44

def close_writer
  @writer.close unless @writer.closed?
end

#for_consumer!Object



61
62
63
# File 'lib/async/background/queue/notifier.rb', line 61

def for_consumer!
  close_writer
end

#for_producer!Object



57
58
59
# File 'lib/async/background/queue/notifier.rb', line 57

def for_producer!
  close_reader
end

#notifyObject Also known as: notify_all



29
30
31
32
33
34
35
# File 'lib/async/background/queue/notifier.rb', line 29

def notify
  @writer.write_nonblock("\x01")
rescue *WRITE_DROPPED
  # All write failures are non-fatal: the job is already in the
  # store, and missing one wake-up only delays pickup by at most
  # one poll interval.
end

#wait(timeout: nil) ⇒ Object



39
40
41
42
# File 'lib/async/background/queue/notifier.rb', line 39

def wait(timeout: nil)
  @reader.wait_readable(timeout)
  drain
end