Class: Chan::UNIXSocket
- Inherits:
-
Object
- Object
- Chan::UNIXSocket
- Defined in:
- lib/xchan/unix_socket.rb
Overview
An easy-to-use InterProcess Communication (IPC) library
Instance Attribute Summary collapse
-
#r ⇒ UNIXSocket
readonly
Returns a socket used for read operations.
-
#s ⇒ <#dump, #load>
(also: #serializer)
readonly
Returns the serializer used by the channel.
-
#w ⇒ UNIXSocket
readonly
Returns a socket used for write operations.
Write methods collapse
-
#send(object) ⇒ Object
(also: #write)
Performs a blocking write.
-
#send_nonblock(object) ⇒ Integer?
(also: #write_nonblock)
Performs a non-blocking write.
Read methods collapse
-
#recv ⇒ Object
(also: #read)
Performs a blocking read.
-
#recv_nonblock ⇒ Object
(also: #read_nonblock)
Performs a non-blocking read.
Stat methods collapse
-
#bytes_received ⇒ Integer
(also: #bytes_read)
Returns the total number of bytes read from the channel.
-
#bytes_sent ⇒ Integer
(also: #bytes_written)
Returns the total number of bytes written to the channel.
-
#size ⇒ Integer
Returns the number of objects waiting to be read.
Wait methods collapse
-
#wait_lockable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become lockable.
-
#wait_readable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable.
-
#wait_writable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable.
Instance Method Summary collapse
-
#close ⇒ void
Closes the channel.
-
#closed? ⇒ Boolean
Returns true when the channel is closed.
-
#empty? ⇒ Boolean
Returns true when the channel is empty.
-
#initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) ⇒ Chan::UNIXSocket
constructor
Returns an instance of Chan::UNIXSocket.
-
#to_a ⇒ Array<Object>
Returns the contents of the channel.
Constructor Details
#initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) ⇒ Chan::UNIXSocket
Returns an instance of Chan::UNIXSocket
42 43 44 45 46 47 48 49 |
# File 'lib/xchan/unix_socket.rb', line 42 def initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) @s = Chan.serializers[serializer]&.call || serializer @r, @w = ::UNIXSocket.pair(sock) @bytes = Chan::Bytes.new(tmpdir) @counter = Chan::Counter.new(tmpdir) @lock = Chan.locks[lock]&.call(tmpdir) || lock @mutex = Mutex.new end |
Instance Attribute Details
#r ⇒ UNIXSocket (readonly)
Returns a socket used for read operations
13 14 15 |
# File 'lib/xchan/unix_socket.rb', line 13 def r @r end |
#s ⇒ <#dump, #load> (readonly) Also known as: serializer
Returns the serializer used by the channel
23 24 25 |
# File 'lib/xchan/unix_socket.rb', line 23 def s @s end |
#w ⇒ UNIXSocket (readonly)
Returns a socket used for write operations
18 19 20 |
# File 'lib/xchan/unix_socket.rb', line 18 def w @w end |
Instance Method Details
#bytes_received ⇒ Integer Also known as: bytes_read
Returns the total number of bytes read from the channel
215 216 217 |
# File 'lib/xchan/unix_socket.rb', line 215 def bytes_received lock { @counter.bytes_read } end |
#bytes_sent ⇒ Integer Also known as: bytes_written
Returns the total number of bytes written to the channel
207 208 209 |
# File 'lib/xchan/unix_socket.rb', line 207 def bytes_sent lock { @counter.bytes_written } end |
#close ⇒ void
This method returns an undefined value.
Closes the channel
63 64 65 66 67 68 69 70 |
# File 'lib/xchan/unix_socket.rb', line 63 def close @lock.lock raise IOError, "closed channel" if closed? [@r, @w, @bytes, @counter, @lock].each(&:close) rescue IOError => ex @lock.release raise(ex) end |
#closed? ⇒ Boolean
Returns true when the channel is closed
54 55 56 |
# File 'lib/xchan/unix_socket.rb', line 54 def closed? @r.closed? and @w.closed? end |
#empty? ⇒ Boolean
Returns true when the channel is empty
196 197 198 199 |
# File 'lib/xchan/unix_socket.rb', line 196 def empty? return true if closed? lock { size.zero? } end |
#recv ⇒ Object Also known as: read
Performs a blocking read
135 136 137 138 139 140 141 142 143 |
# File 'lib/xchan/unix_socket.rb', line 135 def recv recv_nonblock rescue Chan::WaitReadable wait_readable retry rescue Chan::WaitLockable wait_lockable retry end |
#recv_nonblock ⇒ Object Also known as: read_nonblock
Performs a non-blocking read
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/xchan/unix_socket.rb', line 156 def recv_nonblock @mutex.synchronize do @lock.lock_nonblock raise IOError, "closed channel" if closed? len = @bytes.shift obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len)) @counter.increment!(bytes_read: len) obj.tap { @lock.release } rescue IOError => ex @lock.release raise(ex) rescue IO::WaitReadable => ex @bytes.unshift(len) @lock.release raise Chan::WaitReadable, ex. rescue Errno::EAGAIN => ex raise Chan::WaitLockable, ex. end end |
#send(object) ⇒ Object Also known as: write
Performs a blocking write
83 84 85 86 87 88 89 90 91 |
# File 'lib/xchan/unix_socket.rb', line 83 def send(object) send_nonblock(object) rescue Chan::WaitWritable wait_writable retry rescue Chan::WaitLockable wait_lockable retry end |
#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock
Performs a non-blocking write
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/xchan/unix_socket.rb', line 106 def send_nonblock(object) @mutex.synchronize do @lock.lock_nonblock raise IOError, "channel closed" if closed? len = @w.write_nonblock(serialize(object)) @bytes.push(len) @counter.increment!(bytes_written: len) len.tap { @lock.release } rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex @lock.release raise Chan::WaitWritable, ex. rescue Errno::EWOULDBLOCK => ex raise Chan::WaitLockable, ex. end end |
#size ⇒ Integer
Returns the number of objects waiting to be read
223 224 225 |
# File 'lib/xchan/unix_socket.rb', line 223 def size lock { @bytes.size } end |
#to_a ⇒ Array<Object>
Returns the contents of the channel
187 188 189 190 191 |
# File 'lib/xchan/unix_socket.rb', line 187 def to_a lock do [].tap { _1.push(recv) until empty? } end end |
#wait_lockable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become lockable
262 263 264 265 266 267 268 269 |
# File 'lib/xchan/unix_socket.rb', line 262 def wait_lockable(timeout = nil) start = (timeout ? gettime : nil) loop do break(nil) if start && (gettime - start) >= timeout break(self) if @lock.lockable? sleep 0.01 end end |
#wait_readable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable
240 241 242 |
# File 'lib/xchan/unix_socket.rb', line 240 def wait_readable(timeout = nil) @r.wait_readable(timeout) and self end |
#wait_writable(timeout = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable
251 252 253 |
# File 'lib/xchan/unix_socket.rb', line 251 def wait_writable(timeout = nil) @w.wait_writable(timeout) and self end |