omq — ZeroMQ CLI
Command-line tool for sending and receiving ZeroMQ messages on any socket type.
Like nngcat from libnng, but with Ruby eval, Ractor parallelism, and message handlers.
Built on omq — pure Ruby ZeroMQ, no C dependencies.
Install
gem install omq-cli
Quick Start
# Echo server
omq rep -b tcp://:5555 --echo
# Client
echo "hello" | omq req -c tcp://localhost:5555
# Upcase server — -e evals Ruby on each incoming message
omq rep -b tcp://:5555 -e 'it.map(&:upcase)'
Usage: omq TYPE [options]
Types: req, rep, pub, sub, push, pull, pair, dealer, router
Draft: client, server, radio, dish, scatter, gather, channel, peer
Virtual: pipe (PULL → eval → PUSH)
Connection
Every socket needs at least one --bind or --connect:
omq pull --bind tcp://:5557 # listen on port 5557
omq push --connect tcp://host:5557 # connect to host
omq pull -b ipc:///tmp/feed.sock # IPC (unix socket)
omq push -c@work # IPC abstract namespace (@name → ipc://@name)
Multiple endpoints are allowed — omq pull -b tcp://:5557 -b tcp://:5558 binds both.
Pipe takes two positional endpoints (input, output) or uses --in/--out for multiple per side.
Socket types
Unidirectional (send-only / recv-only)
| Send | Recv | Pattern |
|---|---|---|
push |
pull |
Pipeline — round-robin to workers |
pub |
sub |
Publish/subscribe — fan-out with topic filtering |
scatter |
gather |
Pipeline (draft, single-frame only) |
radio |
dish |
Group messaging (draft, single-frame only) |
Send-only sockets read from stdin (or --data/--file) and send. Recv-only sockets receive and write to stdout.
echo "task" | omq push -c tcp://worker:5557
omq pull -b tcp://:5557
Bidirectional (request-reply)
| Type | Behavior |
|---|---|
req |
Sends a request, waits for reply, prints reply |
rep |
Receives request, sends reply (from --echo, -e, --data, --file, or stdin) |
client |
Like req (draft, single-frame) |
server |
Like rep (draft, single-frame, routing-ID aware) |
# echo server
omq rep -b tcp://:5555 --echo
# upcase server
omq rep -b tcp://:5555 -e 'it.map(&:upcase)'
# client
echo "hello" | omq req -c tcp://localhost:5555
Bidirectional (concurrent send + recv)
| Type | Behavior |
|---|---|
pair |
Exclusive 1-to-1 — concurrent send and recv tasks |
dealer |
Like pair but round-robin send to multiple peers |
channel |
Like pair (draft, single-frame) |
These spawn two concurrent tasks: a receiver (prints incoming) and a sender (reads stdin).
-e transforms incoming, -E transforms outgoing.
Routing sockets
| Type | Behavior |
|---|---|
router |
Receives with peer identity prepended; sends to peer by identity |
server |
Like router but draft, single-frame, uses routing IDs |
peer |
Like server (draft, single-frame) |
# monitor mode — just print what arrives
omq router -b tcp://:5555
# reply to specific peer
omq router -b tcp://:5555 --target worker-1 -D "reply"
# dynamic routing via send-eval (first element = identity)
omq router -b tcp://:5555 -E '["worker-1", it.first.upcase]'
--target and --send-eval are mutually exclusive on routing sockets.
Pipe (virtual)
Pipe creates an internal PULL → eval → PUSH pipeline:
omq pipe -c@work -c@sink -e 'it.map(&:upcase)'
# with Ractor workers for CPU parallelism (-P0 = nproc)
omq pipe -c@work -c@sink -P0 -r./fib.rb -e 'fib(Integer(it.first)).to_s'
The first endpoint is the pull-side (input), the second is the push-side (output).
Both must use -c.
Eval: -e and -E
-e (alias --recv-eval) runs a Ruby expression for each incoming message.
-E (alias --send-eval) runs a Ruby expression for each outgoing message.
Variables
| Variable | Value |
|---|---|
it |
Message parts (Array<String>) — Ruby's default block variable |
Block parameters
Expressions support Ruby block parameter syntax. A single parameter receives
the whole parts array; use |(a, b)| to destructure:
# single param = parts array
omq pull -b tcp://:5557 -e '|msg| msg.map(&:upcase)'
# destructure multipart messages
omq pull -b tcp://:5557 -e '|(key, value)| "#{key}=#{value}"'
Return value
| Return | Effect |
|---|---|
Array |
Used as the message parts |
String |
Wrapped in [result] |
nil |
Message is skipped (filtered) |
self (the socket) |
Signals "I already sent" (REP only) |
Control flow
# skip messages matching a pattern
omq pull -b tcp://:5557 -e 'next if it.first.start_with?("#"); it'
# stop on "quit"
omq pull -b tcp://:5557 -e 'break if it.first == "quit"; it'
BEGIN/END blocks
Like awk — BEGIN{} runs once before the message loop, END{} runs after:
omq pull -b tcp://:5557 -e 'BEGIN{ @sum = 0 } @sum += Integer(it.first); next END{ puts @sum }'
Local variables won't work to share state between the blocks. Use @ivars instead.
Which sockets accept which flag
| Socket | -E (send) |
-e (recv) |
|---|---|---|
| push, pub, scatter, radio | transforms outgoing | error |
| pull, sub, gather, dish | error | transforms incoming |
| req, client | transforms request | transforms reply |
| rep, server (reply mode) | error | transforms request → return = reply |
| pair, dealer, channel | transforms outgoing | transforms incoming |
| router, server, peer (monitor) | routes outgoing (first element = identity) | transforms incoming |
| pipe | error | transforms in pipeline |
Examples
# upcase echo server
omq rep -b tcp://:5555 -e 'it.map(&:upcase)'
# transform before sending
echo hello | omq push -c tcp://localhost:5557 -E 'it.map(&:upcase)'
# filter incoming
omq pull -b tcp://:5557 -e 'it.first.include?("error") ? it : nil'
# REQ: different transforms per direction
echo hello | omq req -c tcp://localhost:5555 \
-E 'it.map(&:upcase)' -e 'it.map(&:reverse)'
# generate messages without stdin
omq pub -c tcp://localhost:5556 -E 'Time.now.to_s' -i 1
# use gems
omq sub -c tcp://localhost:5556 -s "" -rjson -e 'JSON.parse(it.first)["temperature"]'
Script handlers (-r)
For non-trivial transforms, put the logic in a Ruby file and load it with -r:
# handler.rb
db = PG.connect("dbname=app")
OMQ.outgoing { |msg| msg.map(&:upcase) }
OMQ.incoming { |msg| db.exec(msg.first).values.flatten }
at_exit { db.close }
omq req -c tcp://localhost:5555 -r./handler.rb
Registration API
| Method | Effect |
|---|---|
| `OMQ.outgoing { \ | msg\ |
| `OMQ.incoming { \ | msg\ |
- use explicit block variable (like
msg) orit - Setup: use local variables and closures at the top of the script
- Teardown: use Ruby's
at_exit { ... } - CLI flags (
-e/-E) override script-registered handlers for the same direction - A script can register one direction while the CLI handles the other:
# handler.rb registers recv_eval, CLI adds send_eval
omq req -c tcp://localhost:5555 -r./handler.rb -E 'it.map(&:upcase)'
Script handler examples
# count.rb — count messages, print total on exit
count = 0
OMQ.incoming { |msg| count += 1; msg }
at_exit { $stderr.puts "processed #{count} messages" }
# json_transform.rb — parse JSON, extract field
require "json"
OMQ.incoming { |first_part, _| [JSON.parse(first_part)["value"]] }
# rate_limit.rb — skip messages arriving too fast
last = 0
OMQ.incoming do |msg|
now = Async::Clock.now # monotonic clock
if now - last >= 0.1
last = now
msg
end
end
# enrich.rb — add timestamp to outgoing messages
OMQ.outgoing { |msg| [*msg, Time.now.iso8601] }
Data sources
| Flag | Behavior |
|---|---|
| (stdin) | Read lines from stdin, one message per line |
-D "text" |
Send literal string (one-shot or repeated with -i) |
-F file |
Read message from file (-F - reads stdin as blob) |
--echo |
Echo received messages back (REP only) |
-D and -F are mutually exclusive.
Formats
| Flag | Format |
|---|---|
-A / --ascii |
Tab-separated frames, non-printable → dots (default) |
-Q / --quoted |
C-style escapes, lossless round-trip |
--raw |
Raw ZMTP binary (pipe to hexdump -C for debugging) |
-J / --jsonl |
JSON Lines — ["frame1","frame2"] per line |
--msgpack |
MessagePack arrays (binary stream) |
-M / --marshal |
Ruby Marshal — one arbitrary Ruby object per message |
Multipart messages: in ASCII/quoted mode, frames are tab-separated. In JSONL mode, each message is a JSON array.
# send multipart via tabs
printf "key\tvalue" | omq push -c tcp://localhost:5557
# JSONL
echo '["key","value"]' | omq push -c tcp://localhost:5557 -J
omq pull -b tcp://:5557 -J
Under -M, each wire frame is one Marshal-dumped Ruby object. Inside -e / -E,
it is that raw object — not an Array of frames — so scalars, hashes, custom
classes, or any Marshal-safe value flow through transparently:
# send a bare String, receive a { string => encoding } Hash
omq push -b tcp://:5557 -ME '"foo"'
omq pull -c tcp://:5557 -M -e '{it => it.encoding}'
# => {"foo" => #<Encoding:UTF-8>}
At -vvv, trace lines for -M render the app-level object instead of wire
bytes: omq: >> (marshal) [nil, :foo, "bar"].
Timing
| Flag | Effect |
|---|---|
-i SECS |
Repeat send every N seconds (wall-clock aligned) |
-n COUNT |
Max messages to send/receive (0 = unlimited) |
-d SECS |
Delay before first send |
-t SECS |
Send/receive timeout |
-l SECS |
Linger time on close (default 5s) |
--reconnect-ivl |
Reconnect interval: SECS or MIN..MAX (default 0.1) |
--heartbeat-ivl SECS |
ZMTP heartbeat interval (detects dead peers) |
# publish a tick every second, 10 times
omq pub -c tcp://localhost:5556 -D "tick" -i 1 -n 10 -d 1
# receive with 5s timeout
omq pull -b tcp://:5557 -t 5
Compression
Set --compress (-z) on either or both sides. The flag enables
ZMTP-Zstd (provided by omq-rfc-zstd), a wire-protocol extension
that negotiates Zstandard compression during the ZMTP handshake via
an X-Compression READY metadata field. If both peers advertise it,
each side compresses its outgoing frames; if only one side does, the
connection stays plaintext (no error). The extension uses the
auto-trained dictionary mode: the sender feeds the first messages
into a dictionary trainer, ships the trained dictionary over a
ZDICT command frame, then switches to dict-bound compression for
the rest of the connection.
omq push -c tcp://remote:5557 -z < data.txt
omq pull -b tcp://:5557 -z
Key generation
Generate a persistent CURVE keypair:
omq keygen
# OMQ_SERVER_PUBLIC='...'
# OMQ_SERVER_SECRET='...'
omq keygen --crypto nuckle # pure Ruby backend (DANGEROUS — not audited)
Export the vars, then use --curve-server (server) or --curve-server-key (client).
CURVE encryption
End-to-end encryption using CurveZMQ. Requires system libsodium:
apt install libsodium-dev # Debian/Ubuntu
brew install libsodium # macOS
To use nuckle (pure Ruby, DANGEROUS — not audited) instead:
omq rep -b tcp://:5555 --echo --curve-server --crypto nuckle
# or: OMQ_CRYPTO=nuckle omq rep -b tcp://:5555 --echo --curve-server
# server (prints OMQ_SERVER_KEY=...)
omq rep -b tcp://:5555 --echo --curve-server
# client (paste the key)
echo "secret" | omq req -c tcp://localhost:5555 \
--curve-server-key '<key from server>'
Persistent keys via env vars: OMQ_SERVER_PUBLIC + OMQ_SERVER_SECRET (server), OMQ_SERVER_KEY (client).
Subscription and groups
# subscribe to topic prefix
omq sub -b tcp://:5556 -s "weather."
# subscribe to all (default)
omq sub -b tcp://:5556
# multiple subscriptions
omq sub -b tcp://:5556 -s "weather." -s "sports."
# RADIO/DISH groups
omq dish -b tcp://:5557 -j "weather" -j "sports"
omq radio -c tcp://localhost:5557 -g "weather" -D "72F"
Identity and routing
# DEALER with identity
echo "hello" | omq dealer -c tcp://localhost:5555 --identity worker-1
# ROUTER receives identity + message as tab-separated
omq router -b tcp://:5555
# ROUTER sends to specific peer
omq router -b tcp://:5555 --target worker-1 -D "reply"
# ROUTER dynamic routing via -E (first element = routing identity)
omq router -b tcp://:5555 -E '["worker-1", it.first.upcase]'
# binary routing IDs (0x prefix)
omq router -b tcp://:5555 --target 0xdeadbeef -D "reply"
Pipe
Pipe creates an in-process PULL → eval → PUSH pipeline:
# basic pipe (positional: first = input, second = output)
omq pipe -c@work -c@sink -e 'it.map(&:upcase)'
# parallel Ractor workers (-P0 = nproc, also combinable: -P0zvv)
omq pipe -c@work -c@sink -P0 -r./fib.rb -e 'fib(Integer(it.first)).to_s'
# fixed number of workers
omq pipe -c@work -c@sink -P4 -e 'it.map(&:upcase)'
# exit when producer disconnects
omq pipe -c@work -c@sink --transient -e 'it.map(&:upcase)'
Multi-peer pipe with --in/--out
Use --in and --out to attach multiple endpoints per side. These are modal switches — subsequent
-b/-c flags attach to the current side:
# fan-in: 2 producers → 1 consumer
omq pipe --in -c@work1 -c@work2 --out -c@sink -e 'it'
# fan-out: 1 producer → 2 consumers (round-robin)
omq pipe --in -b tcp://:5555 --out -c@sink1 -c@sink2 -e 'it'
# bind on input, connect on output
omq pipe --in -b tcp://:5555 -b tcp://:5556 --out -c tcp://sink:5557 -e 'it'
# parallel workers with fan-in (all must be -c)
omq pipe --in -c@a -c@b --out -c@sink -P4 -e 'it'
-P/--parallel requires all endpoints to be --connect. In parallel mode, each Ractor worker
gets its own PULL/PUSH pair connecting to all endpoints.
Transient mode
--transient makes the socket exit when all peers disconnect. Useful for pipeline workers and sinks:
# worker exits when producer is done
omq pipe -c@work -c@sink --transient -e 'it.map(&:upcase)'
# sink exits when all workers disconnect
omq pull -b tcp://:5557 --transient
Exit codes
| Code | Meaning |
|---|---|
| 0 | Success |
| 1 | Error (connection, argument, runtime) |
| 2 | Timeout |
| 3 | Eval error (-e/-E expression raised) |