Class: OMQ::Compression::Zstd::Compressor

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/compression/zstd/compressor.rb

Overview

User-facing configuration object. Assigned to an OMQ socket via ‘socket.compression = OMQ::Compression::Zstd.none/auto/with_dictionary(…)`. Encapsulates the negotiated profile, the dictionaries (per direction), the compression level, and the cached SHA-1 hash used in the READY property.

Per RFC §7.3, dictionaries are per-direction: each direction uses its sender’s dictionary. This object holds two independent dictionary slots — one for outgoing compression and one for incoming decompression — that are populated independently depending on the negotiated profile.

Profiles:

:none         no dictionary; per-frame opportunistic compression
              above MIN_COMPRESS_BYTES_NO_DICT.
:dict_static  caller-supplied dictionary, agreed out of band.
              Loaded into both send and recv slots (symmetric).
              Profile string `zstd:dict:sha1:<hex>`.
:dict_inline  caller-supplied dictionary loaded into the send
              slot only and shipped to the peer via ZDICT once;
              the recv slot is populated when the peer's ZDICT
              arrives.
:dict_auto    no dictionary at connect time; the sender trains
              one socket-wide from the first AUTO_DICT_SAMPLE_COUNT
              messages OR AUTO_DICT_SAMPLE_BYTES of plaintext
              (whichever comes first), installs it into the send
              slot, and ships it via ZDICT. The recv slot is
              populated when the peer's ZDICT arrives.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) ⇒ Compressor

When passive: true, the socket advertises the profile and decodes incoming compressed frames, but never compresses outgoing messages – #min_compress_bytes reports infinity, so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path. Used by omq-cli to decompress-by-default without forcing compression on senders that didn’t opt in.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/omq/compression/zstd/compressor.rb', line 70

def initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false)
  @mode             = mode
  @passive          = passive
  @level            = Integer(level)
  @sentinel         = SENTINEL_ZSTD_FRAME
  @send_dictionary  = nil
  @recv_dictionary  = nil
  @send_dict_bytes  = nil

  case mode
  when :none
    @profile = PROFILE_NONE
  when :dict_static
    bytes            = dictionary.b
    dict             = RZstd::Dictionary.new(bytes, level: @level)
    @send_dictionary = dict
    @recv_dictionary = dict
    @profile         = "#{PROFILE_DICT_PREFIX}#{Digest::SHA1.hexdigest(bytes)}"
  when :dict_inline
    bytes            = dictionary.b
    @send_dictionary = RZstd::Dictionary.new(bytes, level: @level)
    @send_dict_bytes = bytes
    @profile         = PROFILE_DICT_INLINE
  when :dict_auto
    @profile        = PROFILE_DICT_AUTO
    @samples        = []
    @samples_bytes  = 0
    @samples_count  = 0
    @training_done  = false
    @training_mutex = Mutex.new
  else
    raise ArgumentError, "unknown mode: #{mode.inspect}"
  end
end

Instance Attribute Details

#levelObject (readonly)

Returns the value of attribute level.



61
62
63
# File 'lib/omq/compression/zstd/compressor.rb', line 61

def level
  @level
end

#modeObject (readonly)

Returns the value of attribute mode.



61
62
63
# File 'lib/omq/compression/zstd/compressor.rb', line 61

def mode
  @mode
end

#profileObject (readonly)

Returns the value of attribute profile.



61
62
63
# File 'lib/omq/compression/zstd/compressor.rb', line 61

def profile
  @profile
end

#send_dict_bytesObject (readonly)

Returns the value of attribute send_dict_bytes.



61
62
63
# File 'lib/omq/compression/zstd/compressor.rb', line 61

def send_dict_bytes
  @send_dict_bytes
end

#sentinelObject (readonly)

Returns the value of attribute sentinel.



61
62
63
# File 'lib/omq/compression/zstd/compressor.rb', line 61

def sentinel
  @sentinel
end

Instance Method Details

#add_sample(plaintext) ⇒ void

This method returns an undefined value.

Feeds a plaintext sample into the auto-training buffer. No-op for non-auto modes, after training has finished, or for parts >= AUTO_DICT_MAX_SAMPLE_LEN (large frames dilute the dict and blow the sample budget on a handful of messages). Triggers training synchronously when the sample-count or sample-bytes threshold is reached.

Thread-safe: multiple connections sharing this socket-wide Compression may call this concurrently.

Parameters:

  • plaintext (String)


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/omq/compression/zstd/compressor.rb', line 208

def add_sample(plaintext)
  return unless @mode == :dict_auto
  return if @passive
  return if @training_done
  return if plaintext.bytesize >= AUTO_DICT_MAX_SAMPLE_LEN

  # OMQ's Writable mixin already hands us frozen binary Strings
  # (frozen_binary + parts.freeze), so in the common case we
  # can stash the caller's reference without a `.b` copy. Only
  # coerce when the encoding/frozen invariants don't hold.
  sample = plaintext.frozen? && plaintext.encoding == Encoding::BINARY ? plaintext : plaintext.b

  @training_mutex.synchronize do
    return if @training_done
    @samples << sample
    @samples_bytes += plaintext.bytesize
    @samples_count += 1
    maybe_train!
  end
end

#auto?Boolean

Returns true for :dict_auto mode.

Returns:

  • (Boolean)

    true for :dict_auto mode



184
185
186
# File 'lib/omq/compression/zstd/compressor.rb', line 184

def auto?
  @mode == :dict_auto
end

#compress(plaintext) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/omq/compression/zstd/compressor.rb', line 133

def compress(plaintext)
  if @send_dictionary
    @send_dictionary.compress(plaintext)
  else
    RZstd.compress(plaintext, level: @level)
  end
end

#decompress(compressed, max_output_size: nil) ⇒ Object

Bounded single-shot decompression. The ‘max_output_size:` cap is enforced inside the Rust extension: the frame’s Frame_Content_Size header is read first, and MissingContentSizeError / OutputSizeLimitError are raised before allocating the output buffer or invoking the decoder.



147
148
149
150
151
152
153
# File 'lib/omq/compression/zstd/compressor.rb', line 147

def decompress(compressed, max_output_size: nil)
  if @recv_dictionary
    @recv_dictionary.decompress(compressed, max_output_size: max_output_size)
  else
    RZstd.decompress(compressed, max_output_size: max_output_size)
  end
end

#has_recv_dictionary?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/omq/compression/zstd/compressor.rb', line 111

def has_recv_dictionary?
  !@recv_dictionary.nil?
end

#has_send_dictionary?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/omq/compression/zstd/compressor.rb', line 106

def has_send_dictionary?
  !@send_dictionary.nil?
end

#install_recv_dictionary(bytes) ⇒ Object

Install a dictionary into the recv slot. Called by the Connection wrapper when a ZDICT command frame arrives from the peer.



178
179
180
# File 'lib/omq/compression/zstd/compressor.rb', line 178

def install_recv_dictionary(bytes)
  @recv_dictionary = RZstd::Dictionary.new(bytes.b, level: @level)
end

#install_send_dictionary(bytes) ⇒ Object

Install a dictionary into the send slot. Used internally by auto-mode after training: the trained dict is installed here and the bytes stashed for shipping via ZDICT.



169
170
171
172
# File 'lib/omq/compression/zstd/compressor.rb', line 169

def install_send_dictionary(bytes)
  @send_dict_bytes = bytes.b
  @send_dictionary = RZstd::Dictionary.new(@send_dict_bytes, level: @level)
end

#match(peer_property_value) ⇒ Object

Match this compression’s advertised profile against a peer’s X-Compression property value (comma-separated profile list). Returns the matched profile string, or nil for no match.



159
160
161
162
163
# File 'lib/omq/compression/zstd/compressor.rb', line 159

def match(peer_property_value)
  return nil if peer_property_value.nil? || peer_property_value.empty?
  peer_profiles = peer_property_value.split(",").map(&:strip)
  peer_profiles.include?(@profile) ? @profile : nil
end

#min_compress_bytesObject



127
128
129
130
# File 'lib/omq/compression/zstd/compressor.rb', line 127

def min_compress_bytes
  return Float::INFINITY if passive?
  has_send_dictionary? ? MIN_COMPRESS_BYTES_DICT : MIN_COMPRESS_BYTES_NO_DICT
end

#passive?Boolean

True if this side was configured as a passive sender (RFC Sec. 6.4 “Passive senders”): advertise the profile and decompress incoming frames, but never compress outgoing frames. Implemented by making #min_compress_bytes return infinity so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path in Codec.encode_part.

Returns:

  • (Boolean)


122
123
124
# File 'lib/omq/compression/zstd/compressor.rb', line 122

def passive?
  @passive == true
end

#trained?Boolean

Returns true once auto-training has completed (success or give-up). After this point #add_sample is a no-op.

Returns:

  • (Boolean)

    true once auto-training has completed (success or give-up). After this point #add_sample is a no-op.



191
192
193
# File 'lib/omq/compression/zstd/compressor.rb', line 191

def trained?
  @training_done == true
end