Class: ArrowFormat::MessagePullReader

Inherits:
Object
  • Object
show all
Defined in:
lib/arrow-format/streaming-pull-reader.rb

Constant Summary collapse

CONTINUATION_TYPE =
:s32
CONTINUATION_SIZE =
IO::Buffer.size_of(CONTINUATION_TYPE)
CONTINUATION_STRING =
"\xFF\xFF\xFF\xFF".b.freeze
CONTINUATION_INT32 =
-1
METADATA_LENGTH_TYPE =
:s32
METADATA_LENGTH_SIZE =
IO::Buffer.size_of(METADATA_LENGTH_TYPE)

Instance Method Summary collapse

Constructor Details

#initialize(&on_read) ⇒ MessagePullReader

Returns a new instance of MessagePullReader.



36
37
38
39
40
41
42
# File 'lib/arrow-format/streaming-pull-reader.rb', line 36

def initialize(&on_read)
  @on_read = on_read
  @buffer = IO::Buffer.new(0)
  @metadata_length = nil
  @body_length = nil
  @state = :initial
end

Instance Method Details

#consume(chunk) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/arrow-format/streaming-pull-reader.rb', line 63

def consume(chunk)
  return if eos?

  if @buffer.size.zero?
    target = chunk
  else
    @buffer.resize(@buffer.size + chunk.size)
    @buffer.copy(chunk)
    target = @buffer
  end

  loop do
    next_size = next_required_size
    break if next_size.zero?

    if target.size < next_size
      @buffer.resize(target.size) if @buffer.size < target.size
      @buffer.copy(target)
      @buffer.resize(target.size)
      return
    end

    case @state
    when :initial
      consume_initial(target)
    when :metadata_length
      (target)
    when :metadata
      (target)
    when :body
      consume_body(target)
    end
    break if target.size == next_size

    target = target.slice(next_size)
  end
end

#eos?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/arrow-format/streaming-pull-reader.rb', line 59

def eos?
  @state == :eos
end

#next_required_sizeObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/arrow-format/streaming-pull-reader.rb', line 44

def next_required_size
  case @state
  when :initial
    CONTINUATION_SIZE
  when :metadata_length
    METADATA_LENGTH_SIZE
  when :metadata
    @metadata_length
  when :body
    @body_length
  when :eos
    0
  end
end