nnq — nanomsg SP CLI
Command-line tool for sending and receiving nanomsg SP protocol messages on
any nnq socket type. Like nngcat from libnng, but with Ruby eval, Ractor
parallelism, and message handlers.
Built on nnq — pure Ruby SP wire protocol, no C dependencies. Wire-compatible with libnng peers.
Install
gem install nnq-cli
Quick Start
# Echo server
nnq rep -b tcp://:5555 --echo
# Client
echo "hello" | nnq req -c tcp://localhost:5555
# Upcase server — -e evals Ruby on each incoming message
nnq rep -b tcp://:5555 -e '$_.upcase'
Usage: nnq TYPE [options]
Types: req, rep, pub, sub, push, pull, pair
Virtual: pipe (PULL → eval → PUSH)
SP messages are single-frame
Unlike ZeroMQ, nanomsg SP messages have one frame, not many. The CLI
exposes $_ (the message body, a String) and $F (a 1-element array
[$_]) for compatibility with omq-cli expressions, but multipart shenanigans
don't apply.
Connection
Every socket needs at least one --bind or --connect:
nnq pull --bind tcp://:5557 # listen on port 5557
nnq push --connect tcp://host:5557 # connect to host
nnq pull -b ipc:///tmp/feed.sock # IPC (unix socket)
nnq push -c ipc://@abstract # IPC (abstract namespace, Linux)
nnq push -c inproc://name # in-process queue
Bind/connect order doesn't matter — connect is non-blocking and the engine
retries with exponential back-off until the peer is reachable. Multiple
endpoints are allowed: nnq pull -b tcp://:5557 -b tcp://:5558 binds both.
Pipe takes two positional endpoints (input, output) or uses --in/--out for
multiple per side.
Socket types
Unidirectional (send-only / recv-only)
| Send | Recv | Pattern |
|---|---|---|
push |
pull |
Pipeline — round-robin to workers |
pub |
sub |
Publish/subscribe — fan-out with topic prefix filtering |
Send-only sockets read from stdin (or --data/--file) and send. Recv-only
sockets receive and write to stdout.
echo "task" | nnq push -c tcp://worker:5557
nnq pull -b tcp://:5557
Bidirectional (request-reply)
| Type | Behavior |
|---|---|
req |
Sends a request, waits for reply, prints reply |
rep |
Receives request, sends reply (from --echo, -e, --data, --file, or stdin) |
# echo server
nnq rep -b tcp://:5555 --echo
# upcase server
nnq rep -b tcp://:5555 -e '$_.upcase'
# client
echo "hello" | nnq req -c tcp://localhost:5555
Bidirectional (concurrent send + recv)
| Type | Behavior |
|---|---|
pair |
Exclusive 1-to-1 — concurrent send and recv tasks |
These spawn two concurrent tasks: a receiver (prints incoming) and a sender
(reads stdin). -e transforms incoming, -E transforms outgoing.
Pipe (virtual)
Pipe creates an internal PULL → eval → PUSH pipeline:
nnq pipe -c ipc://@work -c ipc://@sink -e '$_.upcase'
# with Ractor workers for CPU parallelism
nnq pipe -c ipc://@work -c ipc://@sink -P 4 -r./fib.rb -e 'fib(Integer($_)).to_s'
The first endpoint is the pull-side (input), the second is the push-side
(output). For parallel mode (-P) all endpoints must be --connect.
Eval: -e and -E
-e (alias --recv-eval) runs a Ruby expression for each incoming message.
-E (alias --send-eval) runs a Ruby expression for each outgoing message.
Globals
| Variable | Value |
|---|---|
$_ |
Message body (String) |
$F |
[$_] — 1-element array, kept for omq-cli compatibility |
Return value
| Return | Effect |
|---|---|
String |
Used as the message body |
Array |
First element used as the body |
nil |
Message is skipped (filtered) |
self (the socket) |
Signals "I already sent" (REP only) |
Control flow
# skip messages matching a pattern
nnq pull -b tcp://:5557 -e 'next if /^#/.match?($_); $_'
# stop on "quit"
nnq pull -b tcp://:5557 -e 'break if /quit/.match?($_); $_'
BEGIN/END blocks
Like awk — BEGIN{} runs once before the message loop, END{} runs after:
nnq pull -b tcp://:5557 -e 'BEGIN{ @sum = 0 } @sum += Integer($_); next END{ puts @sum }'
Local variables won't share state between blocks. Use @ivars instead.
Which sockets accept which flag
| Socket | -E (send) |
-e (recv) |
|---|---|---|
| push, pub | transforms outgoing | error |
| pull, sub | error | transforms incoming |
| req | transforms request | transforms reply |
| rep | error | transforms request → return = reply |
| pair | transforms outgoing | transforms incoming |
| pipe | error | transforms in pipeline |
Examples
# upcase echo server
nnq rep -b tcp://:5555 -e '$_.upcase'
# transform before sending
echo hello | nnq push -c tcp://localhost:5557 -E '$_.upcase'
# filter incoming
nnq pull -b tcp://:5557 -e '$_.include?("error") ? $_ : nil'
# REQ: different transforms per direction
echo hello | nnq req -c tcp://localhost:5555 \
-E '$_.upcase' -e '$_.reverse'
# generate messages without stdin
nnq pub -c tcp://localhost:5556 -E 'Time.now.to_s' -i 1
# use gems
nnq sub -c tcp://localhost:5556 -s "" -rjson -e 'JSON.parse($_)["temperature"]'
Script handlers (-r)
For non-trivial transforms, put the logic in a Ruby file and load it with -r:
# handler.rb
db = PG.connect("dbname=app")
NNQ.outgoing { |msg| msg.upcase }
NNQ.incoming { |msg| db.exec(msg).values.flatten.first }
at_exit { db.close }
nnq req -c tcp://localhost:5555 -r./handler.rb
Registration API
| Method | Effect |
|---|---|
| `NNQ.outgoing { | msg |
| `NNQ.incoming { | msg |
msgis aString(the message body)- Setup: use local variables and closures at the top of the script
- Teardown: use Ruby's
at_exit { ... } - CLI flags (
-e/-E) override script-registered handlers for the same direction - A script can register one direction while the CLI handles the other
Data sources
| Flag | Behavior |
|---|---|
| (stdin) | Read lines from stdin, one message per line |
-D "text" |
Send literal string (one-shot or repeated with -i) |
-F file |
Read message from file (-F - reads stdin as blob) |
--echo |
Echo received messages back (REP only) |
-D and -F are mutually exclusive.
Formats
| Flag | Format |
|---|---|
-A / --ascii |
Tab-separated frames, non-printable → dots (default) |
-Q / --quoted |
C-style escapes, lossless round-trip |
--raw |
Raw body, newline-delimited |
-J / --jsonl |
JSON Lines — ["body"] per line |
--msgpack |
MessagePack arrays (binary stream) |
-M / --marshal |
Ruby Marshal (binary stream of Array<String> objects) |
Since SP messages are single-frame, ASCII/quoted modes don't insert tabs.
nnq push -c tcp://localhost:5557 < data.txt
nnq pull -b tcp://:5557 -J
Timing
| Flag | Effect |
|---|---|
-i SECS |
Repeat send every N seconds (wall-clock aligned) |
-n COUNT |
Max messages to send/receive (0 = unlimited) |
-d SECS |
Delay before first send |
-t SECS |
Send/receive timeout |
-l SECS |
Linger time on close (default 5s) |
--reconnect-ivl |
Reconnect interval: SECS or MIN..MAX (default 0.1) |
# publish a tick every second, 10 times
nnq pub -c tcp://localhost:5556 -D "tick" -i 1 -n 10 -d 1
# receive with 5s timeout
nnq pull -b tcp://:5557 -t 5
Compression
Both sides must use --compress (-z). Uses LZ4 frame format, provided by
the rlz4 gem (Ractor-safe, Rust extension via lz4_flex).
nnq push -c tcp://remote:5557 -z < data.txt
nnq pull -b tcp://:5557 -z
Subscriptions
# subscribe to topic prefix
nnq sub -b tcp://:5556 -s "weather."
# subscribe to all (default)
nnq sub -b tcp://:5556
# multiple subscriptions
nnq sub -b tcp://:5556 -s "weather." -s "sports."
Pipe
Pipe creates an in-process PULL → eval → PUSH pipeline:
# basic pipe (positional: first = input, second = output)
nnq pipe -c ipc://@work -c ipc://@sink -e '$_.upcase'
# parallel Ractor workers (default: all CPUs)
nnq pipe -c ipc://@work -c ipc://@sink -P -r./fib.rb -e 'fib(Integer($_)).to_s'
# fixed number of workers
nnq pipe -c ipc://@work -c ipc://@sink -P 4 -e '$_.upcase'
# exit when producer disconnects
nnq pipe -c ipc://@work -c ipc://@sink --transient -e '$_.upcase'
Multi-peer pipe with --in/--out
Use --in and --out to attach multiple endpoints per side. These are modal
switches — subsequent -b/-c flags attach to the current side:
# fan-in: 2 producers → 1 consumer
nnq pipe --in -c ipc://@work1 -c ipc://@work2 --out -c ipc://@sink -e '$_'
# fan-out: 1 producer → 2 consumers (round-robin)
nnq pipe --in -b tcp://:5555 --out -c ipc://@sink1 -c ipc://@sink2 -e '$_'
# parallel workers with fan-in (all must be -c)
nnq pipe --in -c ipc://@a -c ipc://@b --out -c ipc://@sink -P 4 -e '$_'
-P/--parallel requires all endpoints to be --connect. In parallel mode,
each Ractor worker gets its own PULL/PUSH pair connecting to all endpoints.
Transient mode
--transient makes the socket exit when all peers disconnect. Useful for
pipeline workers and sinks:
# worker exits when producer is done
nnq pipe -c ipc://@work -c ipc://@sink --transient -e '$_.upcase'
# sink exits when all workers disconnect
nnq pull -b tcp://:5557 --transient
Verbose / monitor mode
Pass -v (repeatable) for increasingly chatty output:
| Level | Output |
|---|---|
-v |
Bind/connect endpoints |
-vv |
Lifecycle events (:listening, :connected, :disconnected, ...) |
-vvv |
Per-message trace (:message_sent, :message_received) |
nnq pub -b tcp://:5556 -vv
Exit codes
| Code | Meaning |
|---|---|
| 0 | Success |
| 1 | Error (connection, argument, runtime) |
| 2 | Timeout |
| 3 | Eval error (-e/-E expression raised) |
Interop with libnng / nngcat
nnq-cli speaks the SP wire protocol, so it interoperates with nngcat and any
libnng-based peer over tcp:// and ipc://:
# nnq → nngcat
nngcat --pull0 --listen tcp://127.0.0.1:5555 --quoted &
echo hello | nnq push -c tcp://127.0.0.1:5555
# nngcat → nnq
nnq pull -b tcp://127.0.0.1:5555 &
nngcat --push0 --dial tcp://127.0.0.1:5555 --data "hello-from-nngcat"