Class: Protocol::ZMTP::Codec::Frame

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/codec/frame.rb

Overview

ZMTP frame encode/decode.

Wire format:

Byte 0:   flags (bit 0=MORE, bit 1=LONG, bit 2=COMMAND)
Next 1-8: size (1-byte if short, 8-byte big-endian if LONG)
Next N:   body

Constant Summary collapse

FLAGS_MORE =
0x01
FLAGS_LONG =
0x02
FLAGS_COMMAND =
0x04
SHORT_MAX =

Short frame: 1-byte size, max body 255 bytes.

255
FLAG_BYTES =

Pre-computed single-byte flag strings (avoids Integer#chr + String#b per frame).

Array.new(256) { |i| i.chr.b.freeze }.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(body, more: false, command: false) ⇒ Frame

Returns a new instance of Frame.

Parameters:

  • body (String)

    frame body

  • more (Boolean) (defaults to: false)

    more frames follow

  • command (Boolean) (defaults to: false)

    this is a command frame



37
38
39
40
41
# File 'lib/protocol/zmtp/codec/frame.rb', line 37

def initialize(body, more: false, command: false)
  @body    = body.b
  @more    = more
  @command = command
end

Instance Attribute Details

#bodyString (readonly)

Returns frame body (binary).

Returns:

  • (String)

    frame body (binary)



32
33
34
# File 'lib/protocol/zmtp/codec/frame.rb', line 32

def body
  @body
end

Class Method Details

.encode_message(parts) ⇒ String

Encodes a multi-part message into a single wire-format string. The result can be written to multiple connections without re-encoding each time (useful for fan-out patterns like PUB).

Parameters:

  • parts (Array<String>)

    message frames

Returns:

  • (String)

    frozen binary wire representation



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/protocol/zmtp/codec/frame.rb', line 74

def self.encode_message(parts)
  buf  = String.new(encoding: Encoding::BINARY)
  last = parts.size - 1
  i    = 0
  while i < parts.size
    body  = parts[i]
    body  = body.b unless body.encoding == Encoding::BINARY
    size  = body.bytesize
    flags = i < last ? FLAGS_MORE : 0
    if size > SHORT_MAX
      buf << FLAG_BYTES[flags | FLAGS_LONG] << [size].pack("Q>") << body
    else
      buf << FLAG_BYTES[flags] << FLAG_BYTES[size] << body
    end
    i += 1
  end
  buf.freeze
end

.read_from(io, max_message_size: nil) ⇒ Frame

Reads one frame from an IO-like object.

Parameters:

  • io (#read_exactly)

    must support read_exactly(n)

Returns:

Raises:

  • (Error)

    on invalid frame

  • (EOFError)

    if the connection is closed



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/protocol/zmtp/codec/frame.rb', line 100

def self.read_from(io, max_message_size: nil)
  # Every valid frame has at least 2 header bytes (flags + 1 size
  # byte for short frames, or flags + first size byte for long).
  # Fetching both up-front gives short frames a 2-call read path
  # (header + body) instead of 3.
  head  = io.read_exactly(2)
  flags = head.getbyte(0)

  more    = (flags & FLAGS_MORE) != 0
  long    = (flags & FLAGS_LONG) != 0
  command = (flags & FLAGS_COMMAND) != 0
  size    = long ? read_long_size(io, head.getbyte(1)) : head.getbyte(1)

  if max_message_size && size > max_message_size
    raise Error, "frame size #{size} exceeds max_message_size #{max_message_size}"
  end

  body = size > 0 ? io.read_exactly(size) : EMPTY_BINARY

  new(body, more: more, command: command)
end

.read_long_size(io, msb) ⇒ Integer

Reads the remaining 7 bytes of a long frame’s 8-byte big-endian size field and combines them with msb (already consumed as the second byte of the 2-byte speculative header read).

Parameters:

  • io (#read_exactly)
  • msb (Integer)

    first (most-significant) byte of the size

Returns:

  • (Integer)

    full 64-bit frame size



131
132
133
134
135
136
137
138
139
# File 'lib/protocol/zmtp/codec/frame.rb', line 131

def self.read_long_size(io, msb)
  rest = io.read_exactly(7)

  (msb << 56) |
    (rest.getbyte(0) << 48) | (rest.getbyte(1) << 40) |
    (rest.getbyte(2) << 32) | (rest.getbyte(3) << 24) |
    (rest.getbyte(4) << 16) | (rest.getbyte(5) << 8)  |
     rest.getbyte(6)
end

Instance Method Details

#command?Boolean

Returns true if this is a command frame.

Returns:

  • (Boolean)

    true if this is a command frame



48
# File 'lib/protocol/zmtp/codec/frame.rb', line 48

def command? = @command

#more?Boolean

Returns true if more frames follow in this message.

Returns:

  • (Boolean)

    true if more frames follow in this message



45
# File 'lib/protocol/zmtp/codec/frame.rb', line 45

def more?    = @more

#to_wireString

Encodes to wire bytes.

Returns:

  • (String)

    binary wire representation (flags + size + body)



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/protocol/zmtp/codec/frame.rb', line 53

def to_wire
  size  = @body.bytesize
  flags = 0
  flags |= FLAGS_MORE if @more
  flags |= FLAGS_COMMAND if @command

  if size > SHORT_MAX
    FLAG_BYTES[flags | FLAGS_LONG] + [size].pack("Q>") + @body
  else
    FLAG_BYTES[flags] + FLAG_BYTES[size] + @body
  end
end