Class: NNQ::CLI::Formatter

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/cli/formatter.rb

Overview

Handles encoding/decoding a single-body message in the configured format, plus optional LZ4 compression.

Unlike omq-cli’s Formatter, nnq messages are not multipart — one ‘String` body per message. The API still accepts/returns a 1-element array so that `$F`-based eval expressions work the same way.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(format, compress: false) ⇒ Formatter

Returns a new instance of Formatter.

Parameters:

  • format (Symbol)

    wire format (:ascii, :quoted, :raw, :jsonl, :msgpack, :marshal)

  • compress (Boolean) (defaults to: false)

    whether to apply LZ4 compression



18
19
20
21
# File 'lib/nnq/cli/formatter.rb', line 18

def initialize(format, compress: false)
  @format   = format
  @compress = compress
end

Class Method Details

.preview(msg) ⇒ String

Formats a message body for human-readable preview (logging).

Parameters:

  • msg (Array<String>)

    single-element array

Returns:

  • (String)

    truncated preview



117
118
119
120
# File 'lib/nnq/cli/formatter.rb', line 117

def self.preview(msg)
  body = msg.first.to_s
  "(#{body.bytesize}B) #{preview_body(body)}"
end

.preview_body(body) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/nnq/cli/formatter.rb', line 123

def self.preview_body(body)
  bytes = body.b
  return "''" if bytes.empty?

  sample    = bytes[0, 12]
  printable = sample.count("\x20-\x7e")

  if printable < sample.bytesize / 2
    "[#{bytes.bytesize}B]"
  elsif bytes.bytesize > 12
    "#{sample.gsub(/[^[:print:]]/, ".")}..."
  else
    sample.gsub(/[^[:print:]]/, ".")
  end
end

Instance Method Details

#compress(msg) ⇒ Array<String>

Compresses the body with LZ4 if compression is enabled.

Parameters:

  • msg (Array<String>)

    single-element array

Returns:

  • (Array<String>)

    optionally compressed



96
97
98
# File 'lib/nnq/cli/formatter.rb', line 96

def compress(msg)
  @compress ? msg.map { |p| RLZ4.compress(p) if p } : msg
end

#decode(line) ⇒ Array<String>

Decodes a formatted input line into a 1-element message array.

Parameters:

  • line (String)

    input line (newline-terminated)

Returns:

  • (Array<String>)

    1-element array



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/nnq/cli/formatter.rb', line 51

def decode(line)
  case @format
  when :ascii, :marshal
    [line.chomp]
  when :quoted
    ["\"#{line.chomp}\"".undump]
  when :raw
    [line]
  when :jsonl
    arr = JSON.parse(line.chomp)
    unless arr.is_a?(Array) && arr.all? { |e| e.is_a?(String) }
      abort "JSON Lines input must be an array of strings"
    end
    arr.first(1)
  end
end

#decode_marshal(io) ⇒ Object?

Decodes one Marshal object from the given IO stream.

Parameters:

  • io (IO)

    input stream

Returns:

  • (Object, nil)

    deserialized object, or nil on EOF



73
74
75
76
77
# File 'lib/nnq/cli/formatter.rb', line 73

def decode_marshal(io)
  Marshal.load(io)
rescue EOFError, TypeError
  nil
end

#decode_msgpack(io) ⇒ Object?

Decodes one MessagePack object from the given IO stream.

Parameters:

  • io (IO)

    input stream

Returns:

  • (Object, nil)

    deserialized object, or nil on EOF



84
85
86
87
88
89
# File 'lib/nnq/cli/formatter.rb', line 84

def decode_msgpack(io)
  @msgpack_unpacker ||= MessagePack::Unpacker.new(io)
  @msgpack_unpacker.read
rescue EOFError
  nil
end

#decompress(msg) ⇒ Array<String>

Decompresses the body with LZ4 if compression is enabled. nil/empty bodies pass through.

Parameters:

  • msg (Array<String>)

    possibly compressed single-element array

Returns:

  • (Array<String>)

    decompressed



106
107
108
109
110
# File 'lib/nnq/cli/formatter.rb', line 106

def decompress(msg)
  @compress ? msg.map { |p| p && !p.empty? ? RLZ4.decompress(p) : p } : msg
rescue RLZ4::DecompressError
  raise DecompressError, "decompression failed (did the sender use --compress?)"
end

#encode(msg) ⇒ String

Encodes a message body into a printable string for output.

Parameters:

  • msg (Array<String>)

    single-element array (the body)

Returns:

  • (String)

    formatted output line



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/nnq/cli/formatter.rb', line 28

def encode(msg)
  body = msg.first.to_s
  case @format
  when :ascii
    body.b.gsub(/[^[:print:]\t]/, ".") + "\n"
  when :quoted
    body.b.dump[1..-2] + "\n"
  when :raw
    body
  when :jsonl
    JSON.generate([body]) + "\n"
  when :msgpack
    MessagePack.pack([body])
  when :marshal
    body.inspect + "\n"
  end
end