Module: OMQ::Compression::Zstd::Codec

Defined in:
lib/omq/compression/zstd/codec.rb

Overview

Pure-function frame-body codec. Implements the sender/receiver rules from RFC sections 6.4 and 6.5. Stateless: all state (dictionary, profile, max_message_size) is passed in explicitly. Has no dependency on Protocol::ZMTP::Connection, so this module is unit-testable in isolation.

Class Method Summary collapse

Class Method Details

.decode_part(body, compression, budget_remaining: nil) ⇒ String

Receiver rule (RFC 6.5). Returns the plaintext bytes for the user, or raises on protocol violation.

The budget_remaining argument, when non-nil, is the running remainder of max_message_size left for the current multipart message. The RFC-mandated header checks (Frame_Content_Size present; declared size ≤ budget) happen inside the Rust extension in a single call that either returns the plaintext or raises before any decoder allocation.

Parameters:

  • body (String)

    wire frame body bytes

  • compression (OMQ::Compression::Zstd::Compression)

    the negotiated recv-direction compression object, or nil if no profile is active

  • budget_remaining (Integer, nil) (defaults to: nil)

    remaining decompressed-byte budget for this multipart message; nil disables the cap

Returns:

  • (String)

    plaintext bytes



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/omq/compression/zstd/codec.rb', line 59

def decode_part(body, compression, budget_remaining: nil)
  return body if compression.nil?

  if body.bytesize < SENTINEL_SIZE
    raise ShortFrameError, "ZMTP-Zstd: short frame"
  end

  sentinel = body.byteslice(0, SENTINEL_SIZE)

  case sentinel
  when SENTINEL_UNCOMPRESSED
    plaintext = body.byteslice(SENTINEL_SIZE, body.bytesize - SENTINEL_SIZE)
    enforce_budget!(plaintext.bytesize, budget_remaining)
    plaintext
  when SENTINEL_ZSTD_FRAME
    begin
      compression.decompress(body, max_output_size: budget_remaining)
    rescue RZstd::MissingContentSizeError => e
      raise MissingContentSizeError, "ZMTP-Zstd: missing content size: #{e.message}"
    rescue RZstd::OutputSizeLimitError => e
      raise DecompressedSizeExceedsMaxError,
            "ZMTP-Zstd: decompressed message size exceeds maximum: #{e.message}"
    end
  else
    raise UnknownSentinelError,
          "ZMTP-Zstd: unknown sentinel #{sentinel.unpack1('H*')}"
  end
end

.encode_part(plaintext, compression) ⇒ String

Sender rule (RFC 6.4). Returns the bytes to place in the ZMTP message-frame body.

Parameters:

  • plaintext (String)

    frame payload as supplied by the user

  • compression (OMQ::Compression::Zstd::Compression)

    the negotiated send-direction compression object, or nil if no profile is active

Returns:

  • (String)

    frame body bytes (always binary)



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/omq/compression/zstd/codec.rb', line 24

def encode_part(plaintext, compression)
  plaintext = plaintext.b unless plaintext.encoding == Encoding::BINARY

  return plaintext if compression.nil?

  size = plaintext.bytesize
  if size < compression.min_compress_bytes
    return SENTINEL_UNCOMPRESSED + plaintext
  end

  compressed = compression.compress(plaintext)
  if compressed.bytesize >= size - SENTINEL_SIZE
    SENTINEL_UNCOMPRESSED + plaintext
  else
    compressed
  end
end

.enforce_budget!(size, budget_remaining) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/omq/compression/zstd/codec.rb', line 89

def enforce_budget!(size, budget_remaining)
  return if budget_remaining.nil?
  return if size <= budget_remaining

  raise DecompressedSizeExceedsMaxError,
    "ZMTP-Zstd: decompressed message size exceeds maximum"
end