Class: Protocol::ZMTP::Codec::Frame

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/codec/frame.rb

Overview

ZMTP frame encode/decode.

Wire format:

Byte 0:   flags (bit 0=MORE, bit 1=LONG, bit 2=COMMAND)
Next 1-8: size (1-byte if short, 8-byte big-endian if LONG)
Next N:   body

Constant Summary collapse

FLAGS_MORE =
0x01
FLAGS_LONG =
0x02
FLAGS_COMMAND =
0x04
SHORT_MAX =

Short frame: 1-byte size, max body 255 bytes.

255
FLAG_BYTES =

Pre-computed single-byte flag strings (avoids Integer#chr + String#b per frame).

Array.new(256) { |i| i.chr.b.freeze }.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(body, more: false, command: false) ⇒ Frame

Returns a new instance of Frame.

Parameters:

  • body (String)

    frame body

  • more (Boolean) (defaults to: false)

    more frames follow

  • command (Boolean) (defaults to: false)

    this is a command frame



38
39
40
41
42
# File 'lib/protocol/zmtp/codec/frame.rb', line 38

def initialize(body, more: false, command: false)
  @body    = body.encoding == Encoding::BINARY ? body : body.b
  @more    = more
  @command = command
end

Instance Attribute Details

#bodyString (readonly)

Returns frame body (binary).

Returns:

  • (String)

    frame body (binary)



32
33
34
# File 'lib/protocol/zmtp/codec/frame.rb', line 32

def body
  @body
end

Class Method Details

.encode_message(parts) ⇒ String

Encodes a multi-part message into a single wire-format string. The result can be written to multiple connections without re-encoding each time (useful for fan-out patterns like PUB).

Parameters:

  • parts (Array<String>)

    message frames

Returns:

  • (String)

    frozen binary wire representation



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/protocol/zmtp/codec/frame.rb', line 83

def self.encode_message(parts)
  if parts.size == 1
    s = parts[0].bytesize
    wire_size = s > SHORT_MAX ? 9 + s : 2 + s
  else
    wire_size = 0
    j = 0
    while j < parts.size
      s = parts[j].bytesize
      wire_size += s > SHORT_MAX ? 9 + s : 2 + s
      j += 1
    end
  end

  buf  = String.new(capacity: wire_size, encoding: Encoding::BINARY)
  last = parts.size - 1
  i    = 0

  while i < parts.size
    body  = parts[i]
    body  = body.b unless body.encoding == Encoding::BINARY
    size  = body.bytesize
    flags = i < last ? FLAGS_MORE : 0

    if size > SHORT_MAX
      buf << FLAG_BYTES[flags | FLAGS_LONG] << [size].pack("Q>") << body
    else
      buf << FLAG_BYTES[flags] << FLAG_BYTES[size] << body
    end
    i += 1
  end

  buf.freeze
end

.read_from(io, max_message_size: nil) ⇒ Frame

Reads one frame from an IO-like object.

Parameters:

  • io (#read_exactly)

    must support read_exactly(n)

Returns:

Raises:

  • (Error)

    on invalid frame

  • (EOFError)

    if the connection is closed



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/protocol/zmtp/codec/frame.rb', line 125

def self.read_from(io, max_message_size: nil)
  # Every valid frame has at least 2 header bytes (flags + 1 size
  # byte for short frames, or flags + first size byte for long).
  # Fetching both up-front gives short frames a 2-call read path
  # (header + body) instead of 3.
  head  = io.read_exactly(2)
  flags = head.getbyte(0)

  more    = (flags & FLAGS_MORE) != 0
  long    = (flags & FLAGS_LONG) != 0
  command = (flags & FLAGS_COMMAND) != 0
  size    = long ? read_long_size(io, head.getbyte(1)) : head.getbyte(1)

  if max_message_size && size > max_message_size
    raise Error, "frame size #{size} exceeds max_message_size #{max_message_size}"
  end

  body = size > 0 ? io.read_exactly(size) : EMPTY_BINARY

  new(body, more: more, command: command)
end

.read_long_size(io, msb) ⇒ Integer

Reads the remaining 7 bytes of a long frame’s 8-byte big-endian size field and combines them with msb (already consumed as the second byte of the 2-byte speculative header read).

Parameters:

  • io (#read_exactly)
  • msb (Integer)

    first (most-significant) byte of the size

Returns:

  • (Integer)

    full 64-bit frame size



156
157
158
159
160
161
162
163
164
# File 'lib/protocol/zmtp/codec/frame.rb', line 156

def self.read_long_size(io, msb)
  rest = io.read_exactly(7)

  (msb << 56) |
    (rest.getbyte(0) << 48) | (rest.getbyte(1) << 40) |
    (rest.getbyte(2) << 32) | (rest.getbyte(3) << 24) |
    (rest.getbyte(4) << 16) | (rest.getbyte(5) << 8)  |
     rest.getbyte(6)
end

Instance Method Details

#command?Boolean

Returns true if this is a command frame.

Returns:

  • (Boolean)

    true if this is a command frame



52
53
54
# File 'lib/protocol/zmtp/codec/frame.rb', line 52

def command?
  @command
end

#more?Boolean

Returns true if more frames follow in this message.

Returns:

  • (Boolean)

    true if more frames follow in this message



46
47
48
# File 'lib/protocol/zmtp/codec/frame.rb', line 46

def more?
  @more
end

#to_wireString

Encodes to wire bytes.

Returns:

  • (String)

    binary wire representation (flags + size + body)



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/protocol/zmtp/codec/frame.rb', line 60

def to_wire
  size  = @body.bytesize
  flags = 0
  flags |= FLAGS_MORE if @more
  flags |= FLAGS_COMMAND if @command

  if size > SHORT_MAX
    buf = String.new(capacity: 9 + size, encoding: Encoding::BINARY)
    buf << FLAG_BYTES[flags | FLAGS_LONG] << [size].pack("Q>") << @body
  else
    buf = String.new(capacity: 2 + size, encoding: Encoding::BINARY)
    buf << FLAG_BYTES[flags] << FLAG_BYTES[size] << @body
  end
end