Class: Protocol::ZMTP::Codec::Frame

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/codec/frame.rb

Overview

ZMTP frame encode/decode.

Wire format:

Byte 0:   flags (bit 0=MORE, bit 1=LONG, bit 2=COMMAND)
Next 1-8: size (1-byte if short, 8-byte big-endian if LONG)
Next N:   body

Constant Summary collapse

FLAGS_MORE =
0x01
FLAGS_LONG =
0x02
FLAGS_COMMAND =
0x04
SHORT_MAX =

Short frame: 1-byte size, max body 255 bytes.

255
FLAG_BYTES =

Pre-computed single-byte flag strings (avoids Integer#chr + String#b per frame).

Array.new(256) { |i| i.chr.b.freeze }.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(body, more: false, command: false) ⇒ Frame

Returns a new instance of Frame.

Parameters:

  • body (String)

    frame body

  • more (Boolean) (defaults to: false)

    more frames follow

  • command (Boolean) (defaults to: false)

    this is a command frame



144
145
146
147
148
# File 'lib/protocol/zmtp/codec/frame.rb', line 144

def initialize(body, more: false, command: false)
  @body    = body.encoding == Encoding::BINARY ? body : body.b
  @more    = more
  @command = command
end

Instance Attribute Details

#bodyString (readonly)

Returns frame body (binary).

Returns:

  • (String)

    frame body (binary)



138
139
140
# File 'lib/protocol/zmtp/codec/frame.rb', line 138

def body
  @body
end

Class Method Details

.encode_message(parts) ⇒ String

Encodes a multi-part message into a single wire-format string. The result can be written to multiple connections without re-encoding each time (useful for fan-out patterns like PUB).

Parameters:

  • parts (Array<String>)

    message frames

Returns:

  • (String)

    frozen binary wire representation



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/protocol/zmtp/codec/frame.rb', line 38

def self.encode_message(parts)
  if parts.size == 1
    s    = parts.first.bytesize
    wire = s > SHORT_MAX ? 9 + s : 2 + s
  else
    wire = 0
    j    = 0

    while j < parts.size
      s     = parts[j].bytesize
      wire += s > SHORT_MAX ? 9 + s : 2 + s
      j    += 1
    end
  end

  buf  = String.new(capacity: wire, encoding: Encoding::BINARY)
  last = parts.size - 1
  i    = 0

  while i < parts.size
    body  = parts[i]
    body  = body.b unless body.encoding == Encoding::BINARY
    size  = body.bytesize
    flags = i < last ? FLAGS_MORE : 0

    if size > SHORT_MAX
      buf << FLAG_BYTES[flags | FLAGS_LONG]
      buf << [size].pack("Q>")
      buf << body
    else
      buf << FLAG_BYTES[flags]
      buf << FLAG_BYTES[size]
      buf << body
    end

    i += 1
  end

  buf.freeze
end

.read_from(io, max_message_size: nil) ⇒ Frame

Reads one frame from an IO-like object.

Uses #peek to buffer just enough header bytes (2 for short frames, 9 for long), then drains header + body in a single #read_exactly. This is 2 calls for both short and long frames, vs the naive 3 for long. A speculative read_exactly(9) would be unsafe: a <7-byte short frame at idle would hang waiting for bytes that never arrive, or consume bytes from the next frame on a mixed stream.

Parameters:

  • io (#peek, #read_exactly)

Returns:

Raises:

  • (Error)

    on invalid frame

  • (EOFError)

    if the connection is closed



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/protocol/zmtp/codec/frame.rb', line 93

def self.read_from(io, max_message_size: nil)
  buf = io.peek do |b|
    next false if b.bytesize < 2
    (b.getbyte(0) & FLAGS_LONG) == 0 || b.bytesize >= 9
  end

  raise EOFError, "Stream finished before reading frame header" if buf.bytesize < 2

  flags   = buf.getbyte(0)
  more    = (flags & FLAGS_MORE) != 0
  long    = (flags & FLAGS_LONG) != 0
  command = (flags & FLAGS_COMMAND) != 0

  if long
    raise EOFError, "Stream finished before reading long frame size" if buf.bytesize < 9

    size = (buf.getbyte(1) << 56) |
           (buf.getbyte(2) << 48) |
           (buf.getbyte(3) << 40) |
           (buf.getbyte(4) << 32) |
           (buf.getbyte(5) << 24) |
           (buf.getbyte(6) << 16) |
           (buf.getbyte(7) << 8)  |
            buf.getbyte(8)
    header_size = 9
  else
    size        = buf.getbyte(1)
    header_size = 2
  end

  if max_message_size && size > max_message_size
    raise Error, "frame size #{size} exceeds max_message_size #{max_message_size}"
  end

  if size.zero?
    io.read_exactly(header_size)
    return new(EMPTY_BINARY, more: more, command: command)
  end

  wire = io.read_exactly(header_size + size)
  new(wire.byteslice(header_size, size), more: more, command: command)
end

Instance Method Details

#command?Boolean

Returns true if this is a command frame.

Returns:

  • (Boolean)

    true if this is a command frame



158
159
160
# File 'lib/protocol/zmtp/codec/frame.rb', line 158

def command?
  @command
end

#more?Boolean

Returns true if more frames follow in this message.

Returns:

  • (Boolean)

    true if more frames follow in this message



152
153
154
# File 'lib/protocol/zmtp/codec/frame.rb', line 152

def more?
  @more
end

#to_wireString

Encodes to wire bytes.

Returns:

  • (String)

    binary wire representation (flags + size + body)



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/protocol/zmtp/codec/frame.rb', line 166

def to_wire
  size   = @body.bytesize
  flags  = 0
  flags |= FLAGS_MORE if @more
  flags |= FLAGS_COMMAND if @command

  if size > SHORT_MAX
    buf = String.new(capacity: 9 + size, encoding: Encoding::BINARY)
    buf << FLAG_BYTES[flags | FLAGS_LONG]
    buf << [size].pack("Q>")
    buf << @body
  else
    buf = String.new(capacity: 2 + size, encoding: Encoding::BINARY)
    buf << FLAG_BYTES[flags]
    buf << FLAG_BYTES[size]
    buf << @body
  end
end