Class: OMQ::Compression::Zstd::Compressor
- Inherits:
-
Object
- Object
- OMQ::Compression::Zstd::Compressor
- Defined in:
- lib/omq/compression/zstd/compressor.rb
Overview
User-facing configuration object. Assigned to an OMQ socket via ‘socket.compression = OMQ::Compression::Zstd.none/auto/with_dictionary(…)`. Encapsulates the negotiated profile, the dictionaries (per direction), the compression level, and the cached SHA-1 hash used in the READY property.
Per RFC §7.3, dictionaries are per-direction: each direction uses its sender’s dictionary. This object holds two independent dictionary slots — one for outgoing compression and one for incoming decompression — that are populated independently depending on the negotiated profile.
Profiles:
:none no dictionary; per-frame opportunistic compression
above MIN_COMPRESS_BYTES_NO_DICT.
:dict_static caller-supplied dictionary, agreed out of band.
Loaded into both send and recv slots (symmetric).
Profile string `zstd:dict:sha1:<hex>`.
:dict_inline caller-supplied dictionary loaded into the send
slot only and shipped to the peer via ZDICT once;
the recv slot is populated when the peer's ZDICT
arrives.
:dict_auto no dictionary at connect time; the sender trains
one socket-wide from the first AUTO_DICT_SAMPLE_COUNT
messages OR AUTO_DICT_SAMPLE_BYTES of plaintext
(whichever comes first), installs it into the send
slot, and ships it via ZDICT. The recv slot is
populated when the peer's ZDICT arrives.
Instance Attribute Summary collapse
-
#level ⇒ Object
readonly
Returns the value of attribute level.
-
#mode ⇒ Object
readonly
Returns the value of attribute mode.
-
#profile ⇒ Object
readonly
Returns the value of attribute profile.
-
#send_dict_bytes ⇒ Object
readonly
Returns the value of attribute send_dict_bytes.
-
#sentinel ⇒ Object
readonly
Returns the value of attribute sentinel.
Instance Method Summary collapse
-
#add_sample(plaintext) ⇒ void
Feeds a plaintext sample into the auto-training buffer.
-
#auto? ⇒ Boolean
True for :dict_auto mode.
- #compress(plaintext) ⇒ Object
-
#decompress(compressed, max_output_size: nil) ⇒ Object
Bounded single-shot decompression.
- #has_recv_dictionary? ⇒ Boolean
- #has_send_dictionary? ⇒ Boolean
-
#initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) ⇒ Compressor
constructor
When passive: true, the socket advertises the profile and decodes incoming compressed frames, but never compresses outgoing messages – #min_compress_bytes reports infinity, so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path.
-
#install_recv_dictionary(bytes) ⇒ Object
Install a dictionary into the recv slot.
-
#install_send_dictionary(bytes) ⇒ Object
Install a dictionary into the send slot.
-
#match(peer_property_value) ⇒ Object
Match this compression’s advertised profile against a peer’s X-Compression property value (comma-separated profile list).
- #min_compress_bytes ⇒ Object
-
#passive? ⇒ Boolean
True if this side was configured as a passive sender (RFC Sec. 6.4 “Passive senders”): advertise the profile and decompress incoming frames, but never compress outgoing frames.
-
#trained? ⇒ Boolean
True once auto-training has completed (success or give-up).
Constructor Details
#initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) ⇒ Compressor
When passive: true, the socket advertises the profile and decodes incoming compressed frames, but never compresses outgoing messages – #min_compress_bytes reports infinity, so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path. Used by omq-cli to decompress-by-default without forcing compression on senders that didn’t opt in.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/omq/compression/zstd/compressor.rb', line 70 def initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) @mode = mode @passive = passive @level = Integer(level) @sentinel = SENTINEL_ZSTD_FRAME @send_dictionary = nil @recv_dictionary = nil @send_dict_bytes = nil case mode when :none @profile = PROFILE_NONE when :dict_static bytes = dictionary.b dict = RZstd::Dictionary.new(bytes, level: @level) @send_dictionary = dict @recv_dictionary = dict @profile = "#{PROFILE_DICT_PREFIX}#{Digest::SHA1.hexdigest(bytes)}" when :dict_inline bytes = dictionary.b @send_dictionary = RZstd::Dictionary.new(bytes, level: @level) @send_dict_bytes = bytes @profile = PROFILE_DICT_INLINE when :dict_auto @profile = PROFILE_DICT_AUTO @samples = [] @samples_bytes = 0 @samples_count = 0 @training_done = false @training_mutex = Mutex.new else raise ArgumentError, "unknown mode: #{mode.inspect}" end end |
Instance Attribute Details
#level ⇒ Object (readonly)
Returns the value of attribute level.
61 62 63 |
# File 'lib/omq/compression/zstd/compressor.rb', line 61 def level @level end |
#mode ⇒ Object (readonly)
Returns the value of attribute mode.
61 62 63 |
# File 'lib/omq/compression/zstd/compressor.rb', line 61 def mode @mode end |
#profile ⇒ Object (readonly)
Returns the value of attribute profile.
61 62 63 |
# File 'lib/omq/compression/zstd/compressor.rb', line 61 def profile @profile end |
#send_dict_bytes ⇒ Object (readonly)
Returns the value of attribute send_dict_bytes.
61 62 63 |
# File 'lib/omq/compression/zstd/compressor.rb', line 61 def send_dict_bytes @send_dict_bytes end |
#sentinel ⇒ Object (readonly)
Returns the value of attribute sentinel.
61 62 63 |
# File 'lib/omq/compression/zstd/compressor.rb', line 61 def sentinel @sentinel end |
Instance Method Details
#add_sample(plaintext) ⇒ void
This method returns an undefined value.
Feeds a plaintext sample into the auto-training buffer. No-op for non-auto modes, after training has finished, or for parts >= AUTO_DICT_MAX_SAMPLE_LEN (large frames dilute the dict and blow the sample budget on a handful of messages). Triggers training synchronously when the sample-count or sample-bytes threshold is reached.
Thread-safe: multiple connections sharing this socket-wide Compression may call this concurrently.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/omq/compression/zstd/compressor.rb', line 208 def add_sample(plaintext) return unless @mode == :dict_auto return if @passive return if @training_done return if plaintext.bytesize >= AUTO_DICT_MAX_SAMPLE_LEN # OMQ's Writable mixin already hands us frozen binary Strings # (frozen_binary + parts.freeze), so in the common case we # can stash the caller's reference without a `.b` copy. Only # coerce when the encoding/frozen invariants don't hold. sample = plaintext.frozen? && plaintext.encoding == Encoding::BINARY ? plaintext : plaintext.b @training_mutex.synchronize do return if @training_done @samples << sample @samples_bytes += plaintext.bytesize @samples_count += 1 maybe_train! end end |
#auto? ⇒ Boolean
Returns true for :dict_auto mode.
184 185 186 |
# File 'lib/omq/compression/zstd/compressor.rb', line 184 def auto? @mode == :dict_auto end |
#compress(plaintext) ⇒ Object
133 134 135 136 137 138 139 |
# File 'lib/omq/compression/zstd/compressor.rb', line 133 def compress(plaintext) if @send_dictionary @send_dictionary.compress(plaintext) else RZstd.compress(plaintext, level: @level) end end |
#decompress(compressed, max_output_size: nil) ⇒ Object
Bounded single-shot decompression. The ‘max_output_size:` cap is enforced inside the Rust extension: the frame’s Frame_Content_Size header is read first, and MissingContentSizeError / OutputSizeLimitError are raised before allocating the output buffer or invoking the decoder.
147 148 149 150 151 152 153 |
# File 'lib/omq/compression/zstd/compressor.rb', line 147 def decompress(compressed, max_output_size: nil) if @recv_dictionary @recv_dictionary.decompress(compressed, max_output_size: max_output_size) else RZstd.decompress(compressed, max_output_size: max_output_size) end end |
#has_recv_dictionary? ⇒ Boolean
111 112 113 |
# File 'lib/omq/compression/zstd/compressor.rb', line 111 def has_recv_dictionary? !@recv_dictionary.nil? end |
#has_send_dictionary? ⇒ Boolean
106 107 108 |
# File 'lib/omq/compression/zstd/compressor.rb', line 106 def has_send_dictionary? !@send_dictionary.nil? end |
#install_recv_dictionary(bytes) ⇒ Object
Install a dictionary into the recv slot. Called by the Connection wrapper when a ZDICT command frame arrives from the peer.
178 179 180 |
# File 'lib/omq/compression/zstd/compressor.rb', line 178 def install_recv_dictionary(bytes) @recv_dictionary = RZstd::Dictionary.new(bytes.b, level: @level) end |
#install_send_dictionary(bytes) ⇒ Object
Install a dictionary into the send slot. Used internally by auto-mode after training: the trained dict is installed here and the bytes stashed for shipping via ZDICT.
169 170 171 172 |
# File 'lib/omq/compression/zstd/compressor.rb', line 169 def install_send_dictionary(bytes) @send_dict_bytes = bytes.b @send_dictionary = RZstd::Dictionary.new(@send_dict_bytes, level: @level) end |
#match(peer_property_value) ⇒ Object
Match this compression’s advertised profile against a peer’s X-Compression property value (comma-separated profile list). Returns the matched profile string, or nil for no match.
159 160 161 162 163 |
# File 'lib/omq/compression/zstd/compressor.rb', line 159 def match(peer_property_value) return nil if peer_property_value.nil? || peer_property_value.empty? peer_profiles = peer_property_value.split(",").map(&:strip) peer_profiles.include?(@profile) ? @profile : nil end |
#min_compress_bytes ⇒ Object
127 128 129 130 |
# File 'lib/omq/compression/zstd/compressor.rb', line 127 def min_compress_bytes return Float::INFINITY if passive? has_send_dictionary? ? MIN_COMPRESS_BYTES_DICT : MIN_COMPRESS_BYTES_NO_DICT end |
#passive? ⇒ Boolean
True if this side was configured as a passive sender (RFC Sec. 6.4 “Passive senders”): advertise the profile and decompress incoming frames, but never compress outgoing frames. Implemented by making #min_compress_bytes return infinity so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path in Codec.encode_part.
122 123 124 |
# File 'lib/omq/compression/zstd/compressor.rb', line 122 def passive? @passive == true end |
#trained? ⇒ Boolean
Returns true once auto-training has completed (success or give-up). After this point #add_sample is a no-op.
191 192 193 |
# File 'lib/omq/compression/zstd/compressor.rb', line 191 def trained? @training_done == true end |