Module: DTAS::Buffer::FiddleSplice

Defined in:
lib/dtas/buffer/fiddle_splice.rb

Overview

Used by -player on Linux systems with the “splice” syscall

Constant Summary collapse

MAX_AT_ONCE =

:nodoc:

4096
MAX_AT_ONCE_1 =

page size in Linux

65536
F_MOVE =
1
F_NONBLOCK =
2
Splice =
Fiddle::Function.new(DTAS.libc['splice'], [
  Fiddle::TYPE_INT, # int fd_in,
  Fiddle::TYPE_VOIDP, # loff_t *off_in
  Fiddle::TYPE_INT, # int fd_out
  Fiddle::TYPE_VOIDP, # loff_t *off_out
  Fiddle::TYPE_SIZE_T, # size_t len
  Fiddle::TYPE_INT, # unsigned int flags
],
Fiddle::TYPE_SSIZE_T)
Tee =
Fiddle::Function.new(DTAS.libc['tee'], [
  Fiddle::TYPE_INT, # int fd_in,
  Fiddle::TYPE_INT, # int fd_out
  Fiddle::TYPE_SIZE_T, # size_t len
  Fiddle::TYPE_INT, # unsigned int flags
],
Fiddle::TYPE_SSIZE_T)

Instance Method Summary collapse

Instance Method Details

#__broadcast_tee(blocked, targets, chunk_size) ⇒ Object

returns the largest value we teed



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/dtas/buffer/fiddle_splice.rb', line 117

def __broadcast_tee(blocked, targets, chunk_size)
  most_teed = 0
  targets.delete_if do |dst|
    begin
      t = (dst.nonblock? || most_teed == 0) ?
            tee(@to_io, dst, chunk_size, F_NONBLOCK) :
            __tee_in_full(@to_io, dst, chunk_size)
      if Integer === t
        if t > most_teed
          chunk_size = t if most_teed == 0
          most_teed = t
        end
      else
        blocked << dst
      end
      false
    rescue IOError, Errno::EPIPE => e
      __dst_error(dst, e)
      true
    end
  end
  most_teed
end

#__splice_in_full(src, dst, bytes, flags) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/dtas/buffer/fiddle_splice.rb', line 106

def __splice_in_full(src, dst, bytes, flags)
  rv = 0
  while bytes > 0
    s = splice(src, dst, bytes, flags)
    rv += s
    bytes -= s
  end
  rv
end

#__tee_in_full(src, dst, bytes) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/dtas/buffer/fiddle_splice.rb', line 96

def __tee_in_full(src, dst, bytes)
  rv = 0
  while bytes > 0
    s = tee(src, dst, bytes)
    bytes -= s
    rv += s
  end
  rv
end

#_syserr(s, func) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/dtas/buffer/fiddle_splice.rb', line 34

def _syserr(s, func)
  raise "BUG: we should not encounter EOF on #{func}" if s == 0
  case errno = Fiddle.last_error
  when Errno::EAGAIN::Errno
    return :EAGAIN
  when Errno::EPIPE::Errno
    raise Errno::EPIPE.exception
  when Errno::EINTR::Errno
    return nil
  else
    raise SystemCallError, "#{func} error: #{errno}"
  end
end

#broadcast_inf(targets, limit = nil) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/dtas/buffer/fiddle_splice.rb', line 141

def broadcast_inf(targets, limit = nil)
  if targets.all?(&:ready_write_optimized?)
    blocked = []
  elsif targets.none?(&:nonblock?)
    # if all targets are blocking, don't start until they're all writable
    r = IO.select(nil, targets, nil, 0) or return targets
    blocked = targets - r[1]

    # tell DTAS::UNIXServer#run_once to wait on the blocked targets
    return blocked if blocked[0]

    # all writable, yay!
  else
    blocked = []
  end

  # don't pin too much on one target
  bytes = limit || MAX_AT_ONCE
  last = targets.pop # we splice to the last one, tee to the rest

  # this may return zero if all targets were non-blocking
  most_teed = __broadcast_tee(blocked, targets, bytes)

  # don't splice more than the largest amount we successfully teed
  bytes = most_teed if most_teed > 0

  begin
    targets << last
    if last.nonblock? || most_teed == 0
      s = splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK)
      if Symbol === s
        blocked << last

        # we accomplished nothing!
        # If _all_ writers are blocked, do not discard data,
        # stay blocked on :wait_writable
        return blocked if most_teed == 0

        # the tees targets win, drop data intended for last
        if most_teed > 0
          discard(most_teed)
          @bytes_xfer += most_teed
          # do not watch for writability of last, last is non-blocking
          return :wait_readable
        end
      end
    else
      # the blocking case is simple
      s = __splice_in_full(@to_io, last, bytes, F_MOVE)
    end
    @bytes_xfer += s

    # if we can't splice everything
    # discard it so the early targets do not get repeated data
    if s < bytes && most_teed > 0
      discard(bytes - s)
    end
    :wait_readable
  rescue IOError, Errno::EPIPE => e # last failed, drop it
    __dst_error(last, e)
    targets.pop # we're no longer a valid target

    if most_teed == 0
      # nothing accomplished, watch any targets
      return blocked if blocked[0]
    else
      # some progress, discard the data we could not splice
      @bytes_xfer += most_teed
      discard(most_teed)
    end

    # stop decoding if we're completely errored out
    # returning nil will trigger close
    return targets[0] ? :wait_readable : nil
  end
end

#broadcast_one(targets, limit = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/dtas/buffer/fiddle_splice.rb', line 79

def broadcast_one(targets, limit = nil)
  # single output is always non-blocking
  limit ||= MAX_AT_ONCE_1
  s = splice(@to_io, targets[0], limit, F_MOVE|F_NONBLOCK)
  if Symbol === s
    targets # our one and only target blocked on write
  else
    @bytes_xfer += s
    # s < limit means targets[0] is full
    s < limit ? targets : :wait_readable
  end
rescue Errno::EPIPE, IOError => e
  __dst_error(targets[0], e)
  targets.clear
  nil # do not return error here, we already spewed an error message
end

#buffer_sizeObject



64
65
66
# File 'lib/dtas/buffer/fiddle_splice.rb', line 64

def buffer_size
  @to_io.pipe_size
end

#buffer_size=(bytes) ⇒ Object

nil is OK, won't reset existing pipe, either…



69
70
71
72
# File 'lib/dtas/buffer/fiddle_splice.rb', line 69

def buffer_size=(bytes)
  @to_io.pipe_size = bytes if bytes
  @buffer_size = bytes
end

#discard(bytes) ⇒ Object

be sure to only call this with nil when all writers to @wr are done



75
76
77
# File 'lib/dtas/buffer/fiddle_splice.rb', line 75

def discard(bytes)
  splice(@to_io, DTAS.null, bytes, 0)
end

#splice(src, dst, len, flags) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/dtas/buffer/fiddle_splice.rb', line 48

def splice(src, dst, len, flags)
  begin
    s = Splice.call(src.fileno, nil, dst.fileno, nil, len, flags)
    return s if s > 0
    sym = _syserr(s, 'splice') and return sym
  end while true
end

#tee(src, dst, len, flags = 0) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/dtas/buffer/fiddle_splice.rb', line 56

def tee(src, dst, len, flags = 0)
  begin
    s = Tee.call(src.fileno, dst.fileno, len, flags)
    return s if s > 0
    sym = _syserr(s, 'tee') and return sym
  end while true
end