Class: OMQ::CLI::Formatter

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/formatter.rb

Overview

Handles encoding/decoding messages in the configured format. Compression is handled below the application API by ZMTP-Zstd (omq-rfc-zstd) once enabled via socket.compression=; the formatter sees plaintext frames in both directions.

Constant Summary collapse

LINE_ESCAPES =

Whitespace/backslash → visible escape sequence used by sanitize. Everything else outside printable ASCII collapses to ‘.’ via a single String#tr call.

{
  "\t" => '\\t',
  "\n" => '\\n',
  "\r" => '\\r',
  "\\" => '\\\\',
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(format) ⇒ Formatter

Returns a new instance of Formatter.

Parameters:

  • format (Symbol)

    wire format (:ascii, :quoted, :raw, :jsonl, :msgpack, :marshal)



11
12
13
# File 'lib/omq/cli/formatter.rb', line 11

def initialize(format)
  @format = format
end

Class Method Details

.preview(parts, format: nil, wire_size: nil, uncompressed_size: nil) ⇒ String

Formats message parts for human-readable preview (logging). When wire_size is given (ZMTP-Zstd negotiated), the header also shows the compressed on-the-wire size: “(29B wire=12B)”. Accepts either wire-side Array<String> (monitor events) or post-decode app parts that may contain non-String objects (e.g. -M Marshal.load output).

When format is :marshal, parts is the raw Ruby object itself (not an Array of frames); the preview inspects it so the reader sees the actual payload structure (e.g. ‘[nil, :foo, “bar”]`) instead of a meaningless “1obj” header. For marshal, uncompressed_size is the Marshal.dump bytesize (known to the caller, which already serialized for send or received the wire frame for recv) — passed through instead of redumping here.

Parameters:

  • parts (Array<String, Object>, Object)

    message frames, or raw object when format is :marshal

  • format (Symbol, nil) (defaults to: nil)

    active CLI format (:marshal enables object-inspect mode)

  • wire_size (Integer, nil) (defaults to: nil)

    compressed bytes on the wire

  • uncompressed_size (Integer, nil) (defaults to: nil)

    plaintext bytes (marshal only)

Returns:

  • (String)

    truncated preview of each frame joined by |



116
117
118
119
120
121
122
123
# File 'lib/omq/cli/formatter.rb', line 116

def self.preview(parts, format: nil, wire_size: nil, uncompressed_size: nil)
  case format
  when :marshal
    marshal_preview(parts, uncompressed_size: uncompressed_size, wire_size: wire_size)
  else
    frames_preview(parts, format: format, wire_size: wire_size)
  end
end

.preview_frame(part) ⇒ String

Renders one frame or decoded object for preview. Strings are sanitized byte-wise (first 12 bytes); non-String objects fall back to #inspect (always single-line) truncated at 24 bytes.

Parameters:

  • part (String, Object)

Returns:

  • (String)


174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/omq/cli/formatter.rb', line 174

def self.preview_frame(part)
  unless part.is_a?(String)
    s = part.inspect
    return s.bytesize > 24 ? "#{s.byteslice(0, 24)}" : s
  end

  bytes = part.b
  # Empty frames must render as a visible marker, not as the empty
  # string — otherwise joining with "|" would produce misleading
  # output like "|body" for REP/REQ-style envelopes where the first
  # wire frame is an empty delimiter.
  return "''" if bytes.empty?

  sample    = bytes[0, 12]
  printable = sample.count("\x20-\x7e")

  if printable < sample.bytesize / 2
    "[#{bytes.bytesize}B]"
  elsif bytes.bytesize > 12
    "#{sanitize(sample)}"
  else
    sanitize(sample)
  end
end

.sanitize(bytes) ⇒ String

Escapes bytes so a preview/body line is guaranteed single-line on a shared tty. Tab/newline/CR/backslash render as literal t/n/r/\; other non-printables collapse to ‘.’. Forced to binary encoding first to prevent UTF-8 quirks from rendering raw LF bytes.

Parameters:

  • bytes (String)

Returns:

  • (String)


208
209
210
# File 'lib/omq/cli/formatter.rb', line 208

def self.sanitize(bytes)
  bytes.b.gsub(/[\t\n\r\\]/, LINE_ESCAPES).tr("^ -~", ".")
end

Instance Method Details

#decode(line) ⇒ Array<String>

Decodes a formatted input line into message parts.

Parameters:

  • line (String)

    input line (newline-terminated)

Returns:

  • (Array<String>)

    message frames



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/omq/cli/formatter.rb', line 45

def decode(line)
  case @format
  when :ascii, :marshal
    line.chomp.split("\t", -1)
  when :quoted
    line.chomp.split("\t", -1).map { |p| "\"#{p}\"".undump }
  when :raw
    [line]
  when :jsonl
    arr = JSON.parse(line.chomp)
    abort "JSON Lines input must be an array of strings" unless arr.is_a?(Array) && arr.all? { |e| e.is_a?(String) }
    arr
  end
end

#decode_marshal(io) ⇒ Object?

Decodes one Marshal object from the given IO stream.

Parameters:

  • io (IO)

    input stream

Returns:

  • (Object, nil)

    deserialized object, or nil on EOF



65
66
67
68
69
# File 'lib/omq/cli/formatter.rb', line 65

def decode_marshal(io)
  Marshal.load(io)
rescue EOFError, TypeError
  nil
end

#decode_msgpack(io) ⇒ Object?

Decodes one MessagePack object from the given IO stream.

Parameters:

  • io (IO)

    input stream

Returns:

  • (Object, nil)

    deserialized object, or nil on EOF



76
77
78
79
80
81
# File 'lib/omq/cli/formatter.rb', line 76

def decode_msgpack(io)
  @msgpack_unpacker ||= MessagePack::Unpacker.new(io)
  @msgpack_unpacker.read
rescue EOFError
  nil
end

#encode(parts) ⇒ String

Encodes message parts into a printable string for output.

Parameters:

  • parts (Array<String>)

    message frames

Returns:

  • (String)

    formatted output line



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/omq/cli/formatter.rb', line 20

def encode(parts)
  case @format
  when :ascii
    parts.map { |p| p.b.gsub(/[^[:print:]\t]/, ".") }.join("\t") << "\n"
  when :quoted
    parts.map { |p| p.b.dump[1..-2] }.join("\t") << "\n"
  when :raw
    parts.each_with_index.map do |p, i|
      Protocol::ZMTP::Codec::Frame.new(p.to_s, more: i < parts.size - 1).to_wire
    end.join
  when :jsonl
    JSON.generate(parts) << "\n"
  when :msgpack
    MessagePack.pack(parts)
  when :marshal
    # Under -M, `parts` is a single Ruby object (not a frame array).
    parts.inspect << "\n"
  end
end