Module: DTAS::Buffer::ReadWrite
- Defined in:
- lib/dtas/buffer/read_write.rb
Overview
compatibility code for non-Linux systems lacking “splice” support. Used only by -player
Constant Summary collapse
- MAX_AT_ONCE =
:nodoc:
512
Instance Attribute Summary collapse
-
#buffer_size ⇒ Object
min PIPE_BUF value in POSIX.
Instance Method Summary collapse
- #_rbuf ⇒ Object
- #broadcast_inf(targets, limit = nil) ⇒ Object
-
#broadcast_one(targets, limit = nil) ⇒ Object
always block when we have a single target.
-
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done.
Instance Attribute Details
#buffer_size ⇒ Object
min PIPE_BUF value in POSIX
12 13 14 |
# File 'lib/dtas/buffer/read_write.rb', line 12 def buffer_size @buffer_size end |
Instance Method Details
#_rbuf ⇒ Object
14 15 16 |
# File 'lib/dtas/buffer/read_write.rb', line 14 def _rbuf Thread.current[:dtas_pbuf] ||= ''.b end |
#broadcast_inf(targets, limit = nil) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/dtas/buffer/read_write.rb', line 44 def broadcast_inf(targets, limit = nil) nr_nb = targets.count(&:nonblock?) if nr_nb == 0 || nr_nb == targets.size # if all targets are full, don't start until they're all writable r = IO.select(nil, targets, nil, 0) or return targets blocked = targets - r[1] # tell DTAS::UNIXServer#run_once to wait on the blocked targets return blocked if blocked[0] # all writable, yay! else blocked = [] end again = {} # don't pin too much on one target bytes = inflight limit ||= MAX_AT_ONCE bytes = bytes > limit ? limit : bytes buf = _rbuf @to_io.read(bytes, buf) n = buf.bytesize @bytes_xfer += n targets.delete_if do |dst| begin if dst.nonblock? case w = dst.write_nonblock(buf, exception: false) when :wait_writable blocked << dst else again[dst] = buf.byteslice(w, n) if w < n end else dst.write(buf) end false rescue IOError, Errno::EPIPE => e again.delete(dst) __dst_error(dst, e) true end end # try to write as much as possible again.delete_if do |dst, sbuf| begin case w = dst.write_nonblock(sbuf, exception: false) when :wait_writable blocked << dst true else n = sbuf.bytesize if w < n again[dst] = sbuf.byteslice(w, n) false else true end end rescue IOError, Errno::EPIPE => e __dst_error(dst, e) true end end until again.empty? targets[0] ? :wait_readable : nil end |
#broadcast_one(targets, limit = nil) ⇒ Object
always block when we have a single target
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/dtas/buffer/read_write.rb', line 30 def broadcast_one(targets, limit = nil) buf = _rbuf case rv = @to_io.read_nonblock(limit || MAX_AT_ONCE, buf, exception: false) when nil, :wait_readable then return rv end n = targets[0].write(buf) # IO#write has write-in-full behavior @bytes_xfer += n :wait_readable rescue Errno::EPIPE, IOError => e __dst_error(targets[0], e) targets.clear nil # do not return error here, we already spewed an error message end |
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done
19 20 21 22 23 24 25 26 27 |
# File 'lib/dtas/buffer/read_write.rb', line 19 def discard(bytes) buf = _rbuf begin @to_io.readpartial(bytes, buf) bytes -= buf.bytesize rescue EOFError return end until bytes == 0 end |