Module: OMQ::LZ4::Codec

Defined in:
lib/omq/lz4/codec.rb

Overview

Wire format for the lz4+tcp:// transport, encode/decode over String input/output. Pure functions — no I/O, no connection state. Transport (M2) owns the connection state and calls into these methods per ZMTP part.

Each wire part begins with a 4-byte sentinel:

00 00 00 00   uncompressed plaintext
4C 5A 34 42   LZ4-compressed block ("LZ4B" in ASCII)
4C 5A 34 44   dictionary shipment ("LZ4D" in ASCII)

‘decode_part` handles UNCOMPRESSED and LZ4B only. Dictionary shipments are a transport-layer concern: the transport peeks the first 4 bytes of each incoming wire part, routes LZ4D to `decode_dict_shipment`, and never hands a shipment to `decode_part`.

Constant Summary collapse

UNCOMPRESSED_SENTINEL =
"\x00\x00\x00\x00".b.freeze
LZ4B_SENTINEL =
"LZ4B".b.freeze
LZ4D_SENTINEL =
"LZ4D".b.freeze
MIN_COMPRESS_NO_DICT =

Size thresholds below which compression isn’t worth attempting. Empirically tuned on Lorem-ipsum-like input via bench/min_compress_size_sweep.rb: for block-format LZ4 the crossover where compressed + 12-byte envelope beats plaintext + 4-byte passthrough envelope sits at ~312 B without a dict and ~20 B with one. We round up to 512 / 32 so the machinery isn’t invoked for marginal wins where real-world (less repetitive) payloads would likely fall back to passthrough anyway. Below the threshold, ‘encode_part` emits UNCOMPRESSED directly without touching the compressor.

512
MIN_COMPRESS_WITH_DICT =
32
MAX_DICT_SIZE =

Maximum dictionary size on the wire. A policy choice, not a protocol limit; tight enough that constrained peers can accept dicts without allocating tens of KB of scratch.

8192
COMPRESSED_ENVELOPE =

Envelope sizes:

UNCOMPRESSED = 4 (sentinel)
LZ4B         = 4 (sentinel) + 8 (decompressed_size u64 LE)

> switching from passthrough to compressed costs 8 bytes of

envelope overhead. Compression must save more than that to win.

12
PASSTHROUGH_ENVELOPE =
4

Class Method Summary collapse

Class Method Details

.decode_dict_shipment(wire_bytes) ⇒ Object

Decode a dictionary shipment. Returns the dict bytes (without sentinel). Raises ProtocolError if the sentinel is wrong or the size is out of the [1, 8192] range.



140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/omq/lz4/codec.rb', line 140

def decode_dict_shipment(wire_bytes)
  if wire_bytes.bytesize < 4
    raise ProtocolError, "dict shipment too short (< 4 bytes)"
  end
  sentinel = wire_bytes.byteslice(0, 4)
  unless sentinel == LZ4D_SENTINEL
    raise ProtocolError,
      "not a dict shipment (sentinel #{sentinel.unpack1("H*")}, expected 4C5A3444)"
  end
  dict = wire_bytes.byteslice(4, wire_bytes.bytesize - 4)
  validate_dict_size!(dict.bytesize)
  dict
end

.decode_part(wire_bytes, block_codec:, max_size: nil) ⇒ Object

Decode one wire part. Returns a plaintext binary String.

‘max_size` is an optional cap on the decompressed size of this single part; if the declared (LZ4B) or wire (UNCOMPRESSED) plaintext size exceeds it, raises ProtocolError before any decoder invocation.

Does not handle LZ4D shipments; transport must route those to ‘decode_dict_shipment` before calling here.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/omq/lz4/codec.rb', line 94

def decode_part(wire_bytes, block_codec:, max_size: nil)
  if wire_bytes.bytesize < 4
    raise ProtocolError, "wire part too short (< 4 bytes)"
  end

  sentinel = wire_bytes.byteslice(0, 4)
  case sentinel
  when UNCOMPRESSED_SENTINEL
    payload = wire_bytes.byteslice(4, wire_bytes.bytesize - 4)
    check_size!(payload.bytesize, max_size)
    payload
  when LZ4B_SENTINEL
    if wire_bytes.bytesize < 12
      raise ProtocolError, "LZ4B part too short (< 12 bytes, no room for size field)"
    end
    decompressed_size = wire_bytes.byteslice(4, 8).unpack1("Q<")
    check_size!(decompressed_size, max_size)
    block = wire_bytes.byteslice(12, wire_bytes.bytesize - 12)
    begin
      block_codec.decompress(block, decompressed_size: decompressed_size)
    rescue RLZ4::DecompressError => e
      raise ProtocolError, "LZ4B decode failed: #{e.message}"
    end
  when LZ4D_SENTINEL
    # Should not reach decode_part; transport should have routed this.
    raise ProtocolError,
      "LZ4D dictionary shipment seen at decode_part (transport should route to decode_dict_shipment)"
  else
    raise ProtocolError, "unknown sentinel #{sentinel.unpack1("H*")}"
  end
end

.encode_dict_shipment(dict_bytes) ⇒ Object

Encode a dictionary shipment. Returns wire bytes:

LZ4D sentinel (4 bytes) || dict bytes (1..8192)

The shipment is a single-part ZMTP message (MORE flag clear) from the transport’s perspective, but that framing is the transport’s responsibility.



132
133
134
135
# File 'lib/omq/lz4/codec.rb', line 132

def encode_dict_shipment(dict_bytes)
  validate_dict_size!(dict_bytes.bytesize)
  LZ4D_SENTINEL + dict_bytes
end

.encode_part(plaintext, block_codec:, min_size: nil) ⇒ Object

Encode one plaintext part to wire bytes. Tries compression; falls back to passthrough when compression wouldn’t save at least the envelope overhead.

‘block_codec` is an RLZ4::BlockCodec, optionally constructed with `dict: bytes`. The codec’s dict presence is detected via ‘#has_dict?` to pick the min-size threshold.

‘min_size` overrides the default threshold. Nil (the default) picks `MIN_COMPRESS_NO_DICT` for a no-dict codec and `MIN_COMPRESS_WITH_DICT` for a dict codec.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/omq/lz4/codec.rb', line 68

def encode_part(plaintext, block_codec:, min_size: nil)
  min_size ||= block_codec.has_dict? ? MIN_COMPRESS_WITH_DICT : MIN_COMPRESS_NO_DICT

  return encode_passthrough(plaintext) if plaintext.bytesize < min_size

  compressed = block_codec.compress(plaintext)

  # Net savings = (plaintext + 4) − (compressed + 12) = plaintext − compressed − 8.
  # If ≤ 0, passthrough wins (or ties — prefer passthrough: one
  # fewer u64 for the receiver to parse).
  if compressed.bytesize + COMPRESSED_ENVELOPE >= plaintext.bytesize + PASSTHROUGH_ENVELOPE
    encode_passthrough(plaintext)
  else
    encode_compressed(plaintext.bytesize, compressed)
  end
end