Module: OMQ::LZ4::Codec
- Defined in:
- lib/omq/lz4/codec.rb
Overview
Wire format for the lz4+tcp:// transport, encode/decode over String input/output. Pure functions — no I/O, no connection state. Transport (M2) owns the connection state and calls into these methods per ZMTP part.
Each wire part begins with a 4-byte sentinel:
00 00 00 00 uncompressed plaintext
4C 5A 34 42 LZ4-compressed block ("LZ4B" in ASCII)
4C 5A 34 44 dictionary shipment ("LZ4D" in ASCII)
‘decode_part` handles UNCOMPRESSED and LZ4B only. Dictionary shipments are a transport-layer concern: the transport peeks the first 4 bytes of each incoming wire part, routes LZ4D to `decode_dict_shipment`, and never hands a shipment to `decode_part`.
Constant Summary collapse
- UNCOMPRESSED_SENTINEL =
"\x00\x00\x00\x00".b.freeze
- LZ4B_SENTINEL =
"LZ4B".b.freeze
- LZ4D_SENTINEL =
"LZ4D".b.freeze
- MIN_COMPRESS_NO_DICT =
Size thresholds below which compression isn’t worth attempting. Empirically tuned on Lorem-ipsum-like input via bench/min_compress_size_sweep.rb: for block-format LZ4 the crossover where compressed + 12-byte envelope beats plaintext + 4-byte passthrough envelope sits at ~312 B without a dict and ~20 B with one. We round up to 512 / 32 so the machinery isn’t invoked for marginal wins where real-world (less repetitive) payloads would likely fall back to passthrough anyway. Below the threshold, ‘encode_part` emits UNCOMPRESSED directly without touching the compressor.
512- MIN_COMPRESS_WITH_DICT =
32- MAX_DICT_SIZE =
Maximum dictionary size on the wire. A policy choice, not a protocol limit; tight enough that constrained peers can accept dicts without allocating tens of KB of scratch.
8192- COMPRESSED_ENVELOPE =
Envelope sizes:
UNCOMPRESSED = 4 (sentinel) LZ4B = 4 (sentinel) + 8 (decompressed_size u64 LE)> switching from passthrough to compressed costs 8 bytes of
envelope overhead. Compression must save more than that to win.
12- PASSTHROUGH_ENVELOPE =
4
Class Method Summary collapse
-
.decode_dict_shipment(wire_bytes) ⇒ Object
Decode a dictionary shipment.
-
.decode_part(wire_bytes, block_codec:, max_size: nil) ⇒ Object
Decode one wire part.
-
.encode_dict_shipment(dict_bytes) ⇒ Object
Encode a dictionary shipment.
-
.encode_part(plaintext, block_codec:, min_size: nil) ⇒ Object
Encode one plaintext part to wire bytes.
Class Method Details
.decode_dict_shipment(wire_bytes) ⇒ Object
Decode a dictionary shipment. Returns the dict bytes (without sentinel). Raises ProtocolError if the sentinel is wrong or the size is out of the [1, 8192] range.
140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/omq/lz4/codec.rb', line 140 def decode_dict_shipment(wire_bytes) if wire_bytes.bytesize < 4 raise ProtocolError, "dict shipment too short (< 4 bytes)" end sentinel = wire_bytes.byteslice(0, 4) unless sentinel == LZ4D_SENTINEL raise ProtocolError, "not a dict shipment (sentinel #{sentinel.unpack1("H*")}, expected 4C5A3444)" end dict = wire_bytes.byteslice(4, wire_bytes.bytesize - 4) validate_dict_size!(dict.bytesize) dict end |
.decode_part(wire_bytes, block_codec:, max_size: nil) ⇒ Object
Decode one wire part. Returns a plaintext binary String.
‘max_size` is an optional cap on the decompressed size of this single part; if the declared (LZ4B) or wire (UNCOMPRESSED) plaintext size exceeds it, raises ProtocolError before any decoder invocation.
Does not handle LZ4D shipments; transport must route those to ‘decode_dict_shipment` before calling here.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/omq/lz4/codec.rb', line 94 def decode_part(wire_bytes, block_codec:, max_size: nil) if wire_bytes.bytesize < 4 raise ProtocolError, "wire part too short (< 4 bytes)" end sentinel = wire_bytes.byteslice(0, 4) case sentinel when UNCOMPRESSED_SENTINEL payload = wire_bytes.byteslice(4, wire_bytes.bytesize - 4) check_size!(payload.bytesize, max_size) payload when LZ4B_SENTINEL if wire_bytes.bytesize < 12 raise ProtocolError, "LZ4B part too short (< 12 bytes, no room for size field)" end decompressed_size = wire_bytes.byteslice(4, 8).unpack1("Q<") check_size!(decompressed_size, max_size) block = wire_bytes.byteslice(12, wire_bytes.bytesize - 12) begin block_codec.decompress(block, decompressed_size: decompressed_size) rescue RLZ4::DecompressError => e raise ProtocolError, "LZ4B decode failed: #{e.}" end when LZ4D_SENTINEL # Should not reach decode_part; transport should have routed this. raise ProtocolError, "LZ4D dictionary shipment seen at decode_part (transport should route to decode_dict_shipment)" else raise ProtocolError, "unknown sentinel #{sentinel.unpack1("H*")}" end end |
.encode_dict_shipment(dict_bytes) ⇒ Object
Encode a dictionary shipment. Returns wire bytes:
LZ4D sentinel (4 bytes) || dict bytes (1..8192)
The shipment is a single-part ZMTP message (MORE flag clear) from the transport’s perspective, but that framing is the transport’s responsibility.
132 133 134 135 |
# File 'lib/omq/lz4/codec.rb', line 132 def encode_dict_shipment(dict_bytes) validate_dict_size!(dict_bytes.bytesize) LZ4D_SENTINEL + dict_bytes end |
.encode_part(plaintext, block_codec:, min_size: nil) ⇒ Object
Encode one plaintext part to wire bytes. Tries compression; falls back to passthrough when compression wouldn’t save at least the envelope overhead.
‘block_codec` is an RLZ4::BlockCodec, optionally constructed with `dict: bytes`. The codec’s dict presence is detected via ‘#has_dict?` to pick the min-size threshold.
‘min_size` overrides the default threshold. Nil (the default) picks `MIN_COMPRESS_NO_DICT` for a no-dict codec and `MIN_COMPRESS_WITH_DICT` for a dict codec.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/omq/lz4/codec.rb', line 68 def encode_part(plaintext, block_codec:, min_size: nil) min_size ||= block_codec.has_dict? ? MIN_COMPRESS_WITH_DICT : MIN_COMPRESS_NO_DICT return encode_passthrough(plaintext) if plaintext.bytesize < min_size compressed = block_codec.compress(plaintext) # Net savings = (plaintext + 4) − (compressed + 12) = plaintext − compressed − 8. # If ≤ 0, passthrough wins (or ties — prefer passthrough: one # fewer u64 for the receiver to parse). if compressed.bytesize + COMPRESSED_ENVELOPE >= plaintext.bytesize + PASSTHROUGH_ENVELOPE encode_passthrough(plaintext) else encode_compressed(plaintext.bytesize, compressed) end end |