omq-zstd

Gem Version License: ISC Ruby

Status: Draft. Wire format may change before the first tagged release.

Zstandard-compressed TCP transport for OMQ. Pick zstd+tcp:// instead of tcp:// and every message part on the wire is compressed per-part with Zstandard. Compression is intrinsic to the transport — no negotiation, no socket option, no payload changes. The ZMTP handshake itself runs over plain TCP; only post-handshake message parts are compressed.

See RFC.md for the wire-format specification and DESIGN.md for the implementation rationale.

Install

# Gemfile
gem "omq-zstd"
gem install omq-zstd

Usage

require "omq"
require "omq/zstd"

pull = OMQ::PULL.new
push = OMQ::PUSH.new

uri = pull.bind("zstd+tcp://127.0.0.1:0")
push.connect(uri.to_s)

push << ["hello, compressed world"]
pull.receive  # => ["hello, compressed world"]

Both peers must use the zstd+tcp:// scheme. A tcp:// peer cannot talk to a zstd+tcp:// peer — they speak different transports.

Compression level

Default is -3 (negative = Zstd's fast strategy). Override at bind/connect:

pull.bind("zstd+tcp://127.0.0.1:0", level: 3)
push.connect("zstd+tcp://127.0.0.1:5555", level: 9)

Per-direction, per-side: each side picks its own send level. Receiving works at any level the peer chose.

Dictionaries

Small messages don't compress well on their own. A shared Zstd dictionary trained on representative payloads gives 2–10× ratios on payloads in the dozens-to-hundreds-of-bytes range.

User-supplied dictionary (out-of-band agreement):

dict = File.binread("schema.dict")  # produced by `zstd --train`
push.connect("zstd+tcp://127.0.0.1:5555", dict: dict)

The sender ships the dictionary to the receiver in-band as a one-shot single-part message prefixed with the dictionary sentinel (37 A4 30 EC), so the receiver does not need a copy on disk.

Auto-trained dictionary (zero config — the default when no dict: is passed): the sender collects up to 1000 samples or 100 KiB (whichever hits first), trains a dictionary, ships it inline, and switches to dictionary mode. Until then, payloads are compressed without a dictionary or sent plaintext when below the threshold.

Compression thresholds

To avoid pessimizing tiny frames, the sender skips compression below:

Mode Threshold
No dictionary 512 B
With dictionary 64 B

Below the threshold the part is sent uncompressed (4-byte zero sentinel + plaintext bytes).

Security limits

The receiver bounds decompression by the socket's own max_message_size — the same knob you'd use on a plain tcp:// socket. It caps the total decompressed size of all parts in a single message, not each part individually: the budget starts at max_message_size and shrinks as each part is decoded, so a message whose parts sum to more than the cap is rejected on the offending part.

pull.max_message_size = 1_048_576  # 1 MiB cap on the total message

If max_message_size is nil (OMQ's default, unlimited), there is no ceiling on decompressed message size. Set a value that matches what your application would tolerate over plain tcp://.

Independent of the message-size knob, the dictionary itself is capped at 64 KiB (Zstd's recommended dictionary size range). A peer attempting to ship a larger dictionary, or send a message whose decompressed parts exceed max_message_size, drops the connection — OMQ::SocketDeadError surfaces on the next receive.

When to use it

zstd+tcp:// is worth picking when:

  • You're network-bound (cross-region, IoT links, congested LAN).
  • Your payloads have repetitive structure (JSON, log lines, protobuf with string fields, similar binary records).
  • You want compression without touching the message format on either side.

It is not worth it for:

  • inproc:// or ipc:// — irrelevant; there is no wire to shrink. Use zstd+tcp:// only on the connections that actually need it. Other transports on the same socket are unaffected.
  • Already-compressed payloads (gzip, video, encrypted blobs) — the Zstd pass adds CPU for no gain.
  • Latency-critical sub-microsecond paths — compression adds single-digit microseconds per kilobyte at low levels, but it is not free.

How it works (in one paragraph)

require "omq/zstd" registers the zstd+tcp scheme on OMQ::Engine.transports. A zstd+tcp socket builds a per-engine Codec (one Zstd dictionary instance shared across all the socket's connections — fan-out compresses each part exactly once). Each accepted or dialed TCP connection is wrapped in ZstdConnection, a SimpleDelegator over the underlying ZMTP connection that intercepts #send_message / #write_message / #receive_message. Message parts go out as a 4-byte sentinel + payload: 00 00 00 00 for plaintext, 28 B5 2F FD (Zstandard frame magic) for a compressed part, or 37 A4 30 EC for a one-shot single-part dictionary shipment. The receiver dispatches on the sentinel, decompresses with bounded buffers, and hands plaintext parts up to ZMTP unchanged.

Development

OMQ_DEV=1 bundle install
OMQ_DEV=1 bundle exec rake test
OMQ_DEV=1 bundle exec ruby --yjit bench/level_sweep.rb

License

ISC