Class: Protocol::ZMTP::Codec::Frame
- Inherits:
-
Object
- Object
- Protocol::ZMTP::Codec::Frame
- Defined in:
- lib/protocol/zmtp/codec/frame.rb
Overview
ZMTP frame encode/decode.
Wire format:
Byte 0: flags (bit 0=MORE, bit 1=LONG, bit 2=COMMAND)
Next 1-8: size (1-byte if short, 8-byte big-endian if LONG)
Next N: body
Constant Summary collapse
- FLAGS_MORE =
0x01- FLAGS_LONG =
0x02- FLAGS_COMMAND =
0x04- SHORT_MAX =
Short frame: 1-byte size, max body 255 bytes.
255- FLAG_BYTES =
Pre-computed single-byte flag strings (avoids Integer#chr + String#b per frame).
Array.new(256) { |i| i.chr.b.freeze }.freeze
Instance Attribute Summary collapse
-
#body ⇒ String
readonly
Frame body (binary).
Class Method Summary collapse
-
.encode_message(parts) ⇒ String
Encodes a multi-part message into a single wire-format string.
-
.read_from(io, max_message_size: nil) ⇒ Frame
Reads one frame from an IO-like object.
Instance Method Summary collapse
-
#command? ⇒ Boolean
True if this is a command frame.
-
#initialize(body, more: false, command: false) ⇒ Frame
constructor
A new instance of Frame.
-
#more? ⇒ Boolean
True if more frames follow in this message.
-
#to_wire ⇒ String
Encodes to wire bytes.
Constructor Details
#initialize(body, more: false, command: false) ⇒ Frame
Returns a new instance of Frame.
144 145 146 147 148 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 144 def initialize(body, more: false, command: false) @body = body.encoding == Encoding::BINARY ? body : body.b @more = more @command = command end |
Instance Attribute Details
#body ⇒ String (readonly)
Returns frame body (binary).
138 139 140 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 138 def body @body end |
Class Method Details
.encode_message(parts) ⇒ String
Encodes a multi-part message into a single wire-format string. The result can be written to multiple connections without re-encoding each time (useful for fan-out patterns like PUB).
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 38 def self.(parts) if parts.size == 1 s = parts.first.bytesize wire = s > SHORT_MAX ? 9 + s : 2 + s else wire = 0 j = 0 while j < parts.size s = parts[j].bytesize wire += s > SHORT_MAX ? 9 + s : 2 + s j += 1 end end buf = String.new(capacity: wire, encoding: Encoding::BINARY) last = parts.size - 1 i = 0 while i < parts.size body = parts[i] body = body.b unless body.encoding == Encoding::BINARY size = body.bytesize flags = i < last ? FLAGS_MORE : 0 if size > SHORT_MAX buf << FLAG_BYTES[flags | FLAGS_LONG] buf << [size].pack("Q>") buf << body else buf << FLAG_BYTES[flags] buf << FLAG_BYTES[size] buf << body end i += 1 end buf.freeze end |
.read_from(io, max_message_size: nil) ⇒ Frame
Reads one frame from an IO-like object.
Uses #peek to buffer just enough header bytes (2 for short frames, 9 for long), then drains header + body in a single #read_exactly. This is 2 calls for both short and long frames, vs the naive 3 for long. A speculative read_exactly(9) would be unsafe: a <7-byte short frame at idle would hang waiting for bytes that never arrive, or consume bytes from the next frame on a mixed stream.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 93 def self.read_from(io, max_message_size: nil) buf = io.peek do |b| next false if b.bytesize < 2 (b.getbyte(0) & FLAGS_LONG) == 0 || b.bytesize >= 9 end raise EOFError, "Stream finished before reading frame header" if buf.bytesize < 2 flags = buf.getbyte(0) more = (flags & FLAGS_MORE) != 0 long = (flags & FLAGS_LONG) != 0 command = (flags & FLAGS_COMMAND) != 0 if long raise EOFError, "Stream finished before reading long frame size" if buf.bytesize < 9 size = (buf.getbyte(1) << 56) | (buf.getbyte(2) << 48) | (buf.getbyte(3) << 40) | (buf.getbyte(4) << 32) | (buf.getbyte(5) << 24) | (buf.getbyte(6) << 16) | (buf.getbyte(7) << 8) | buf.getbyte(8) header_size = 9 else size = buf.getbyte(1) header_size = 2 end if && size > raise Error, "frame size #{size} exceeds max_message_size #{}" end if size.zero? io.read_exactly(header_size) return new(EMPTY_BINARY, more: more, command: command) end wire = io.read_exactly(header_size + size) new(wire.byteslice(header_size, size), more: more, command: command) end |
Instance Method Details
#command? ⇒ Boolean
Returns true if this is a command frame.
158 159 160 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 158 def command? @command end |
#more? ⇒ Boolean
Returns true if more frames follow in this message.
152 153 154 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 152 def more? @more end |
#to_wire ⇒ String
Encodes to wire bytes.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/protocol/zmtp/codec/frame.rb', line 166 def to_wire size = @body.bytesize flags = 0 flags |= FLAGS_MORE if @more flags |= FLAGS_COMMAND if @command if size > SHORT_MAX buf = String.new(capacity: 9 + size, encoding: Encoding::BINARY) buf << FLAG_BYTES[flags | FLAGS_LONG] buf << [size].pack("Q>") buf << @body else buf = String.new(capacity: 2 + size, encoding: Encoding::BINARY) buf << FLAG_BYTES[flags] buf << FLAG_BYTES[size] buf << @body end end |