Class: OMQ::RFC::Zstd::Compression
- Inherits:
-
Object
- Object
- OMQ::RFC::Zstd::Compression
- Defined in:
- lib/omq/rfc/zstd/compression.rb
Overview
User-facing configuration object. Assigned to an OMQ socket via ‘socket.compression = OMQ::RFC::Zstd::Compression.new(…)`. Encapsulates the negotiated profile, the dictionaries (per direction), the compression level, and the cached SHA-1 hash used in the READY property.
Per RFC §7.3, dictionaries are per-direction: each direction uses its sender’s dictionary. This object holds two independent dictionary slots — one for outgoing compression and one for incoming decompression — that are populated independently depending on the negotiated profile.
Profiles:
:none no dictionary; per-frame opportunistic compression
above MIN_COMPRESS_BYTES_NO_DICT.
:dict_static caller-supplied dictionary, agreed out of band.
Loaded into both send and recv slots (symmetric).
Profile string `zstd:dict:sha1:<hex>`.
:dict_inline caller-supplied dictionary loaded into the send
slot only and shipped to the peer via ZDICT once;
the recv slot is populated when the peer's ZDICT
arrives.
:dict_auto no dictionary at connect time; the sender trains
one socket-wide from the first AUTO_DICT_SAMPLE_COUNT
messages OR AUTO_DICT_SAMPLE_BYTES of plaintext
(whichever comes first), installs it into the send
slot, and ships it via ZDICT. The recv slot is
populated when the peer's ZDICT arrives.
Instance Attribute Summary collapse
-
#level ⇒ Object
readonly
Returns the value of attribute level.
-
#mode ⇒ Object
readonly
Returns the value of attribute mode.
-
#profile ⇒ Object
readonly
Returns the value of attribute profile.
-
#send_dict_bytes ⇒ Object
readonly
Returns the value of attribute send_dict_bytes.
-
#sentinel ⇒ Object
readonly
Returns the value of attribute sentinel.
Class Method Summary collapse
- .auto(level: DEFAULT_LEVEL, passive: false) ⇒ Object
- .none(level: DEFAULT_LEVEL, passive: false) ⇒ Object
- .with_dictionary(bytes, inline: false, level: DEFAULT_LEVEL, passive: false) ⇒ Object
Instance Method Summary collapse
-
#add_sample(plaintext) ⇒ void
Feeds a plaintext sample into the auto-training buffer.
-
#auto? ⇒ Boolean
True for :dict_auto mode.
- #compress(plaintext) ⇒ Object
-
#decompress(compressed, max_output_size: nil) ⇒ Object
Bounded single-shot decompression.
- #has_recv_dictionary? ⇒ Boolean
- #has_send_dictionary? ⇒ Boolean
-
#initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) ⇒ Compression
constructor
When passive: true, the socket advertises the profile and decodes incoming compressed frames, but never compresses outgoing messages – #min_compress_bytes reports infinity, so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path.
-
#install_recv_dictionary(bytes) ⇒ Object
Install a dictionary into the recv slot.
-
#install_send_dictionary(bytes) ⇒ Object
Install a dictionary into the send slot.
-
#match(peer_property_value) ⇒ Object
Match this compression’s advertised profile against a peer’s X-Compression property value (comma-separated profile list).
- #min_compress_bytes ⇒ Object
-
#passive? ⇒ Boolean
True if this side was configured as a passive sender (RFC Sec. 6.4 “Passive senders”): advertise the profile and decompress incoming frames, but never compress outgoing frames.
-
#trained? ⇒ Boolean
True once auto-training has completed (success or give-up).
Constructor Details
#initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) ⇒ Compression
When passive: true, the socket advertises the profile and decodes incoming compressed frames, but never compresses outgoing messages – #min_compress_bytes reports infinity, so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path. Used by omq-cli to decompress-by-default without forcing compression on senders that didn’t opt in.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/omq/rfc/zstd/compression.rb', line 67 def initialize(mode:, dictionary:, level: DEFAULT_LEVEL, passive: false) @mode = mode @passive = passive @level = Integer(level) @sentinel = SENTINEL_ZSTD_FRAME @send_dictionary = nil @recv_dictionary = nil @send_dict_bytes = nil case mode when :none @profile = PROFILE_NONE when :dict_static bytes = dictionary.b dict = RZstd::Dictionary.new(bytes, level: @level) @send_dictionary = dict @recv_dictionary = dict @profile = "#{PROFILE_DICT_PREFIX}#{Digest::SHA1.hexdigest(bytes)}" when :dict_inline bytes = dictionary.b @send_dictionary = RZstd::Dictionary.new(bytes, level: @level) @send_dict_bytes = bytes @profile = PROFILE_DICT_INLINE when :dict_auto @profile = PROFILE_DICT_AUTO @samples = [] @samples_bytes = 0 @samples_count = 0 @training_done = false @training_mutex = Mutex.new else raise ArgumentError, "unknown mode: #{mode.inspect}" end end |
Instance Attribute Details
#level ⇒ Object (readonly)
Returns the value of attribute level.
42 43 44 |
# File 'lib/omq/rfc/zstd/compression.rb', line 42 def level @level end |
#mode ⇒ Object (readonly)
Returns the value of attribute mode.
42 43 44 |
# File 'lib/omq/rfc/zstd/compression.rb', line 42 def mode @mode end |
#profile ⇒ Object (readonly)
Returns the value of attribute profile.
42 43 44 |
# File 'lib/omq/rfc/zstd/compression.rb', line 42 def profile @profile end |
#send_dict_bytes ⇒ Object (readonly)
Returns the value of attribute send_dict_bytes.
42 43 44 |
# File 'lib/omq/rfc/zstd/compression.rb', line 42 def send_dict_bytes @send_dict_bytes end |
#sentinel ⇒ Object (readonly)
Returns the value of attribute sentinel.
42 43 44 |
# File 'lib/omq/rfc/zstd/compression.rb', line 42 def sentinel @sentinel end |
Class Method Details
.auto(level: DEFAULT_LEVEL, passive: false) ⇒ Object
57 58 59 |
# File 'lib/omq/rfc/zstd/compression.rb', line 57 def self.auto(level: DEFAULT_LEVEL, passive: false) new(mode: :dict_auto, dictionary: nil, level: level, passive: passive) end |
.none(level: DEFAULT_LEVEL, passive: false) ⇒ Object
44 45 46 |
# File 'lib/omq/rfc/zstd/compression.rb', line 44 def self.none(level: DEFAULT_LEVEL, passive: false) new(mode: :none, dictionary: nil, level: level, passive: passive) end |
.with_dictionary(bytes, inline: false, level: DEFAULT_LEVEL, passive: false) ⇒ Object
48 49 50 51 52 53 54 55 |
# File 'lib/omq/rfc/zstd/compression.rb', line 48 def self.with_dictionary(bytes, inline: false, level: DEFAULT_LEVEL, passive: false) new( mode: inline ? :dict_inline : :dict_static, dictionary: bytes, level: level, passive: passive, ) end |
Instance Method Details
#add_sample(plaintext) ⇒ void
This method returns an undefined value.
Feeds a plaintext sample into the auto-training buffer. No-op for non-auto modes, after training has finished, or for parts >= AUTO_DICT_MAX_SAMPLE_LEN (large frames dilute the dict and blow the sample budget on a handful of messages). Triggers training synchronously when the sample-count or sample-bytes threshold is reached.
Thread-safe: multiple connections sharing this socket-wide Compression may call this concurrently.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/omq/rfc/zstd/compression.rb', line 196 def add_sample(plaintext) return unless @mode == :dict_auto return if @passive return if @training_done return if plaintext.bytesize >= AUTO_DICT_MAX_SAMPLE_LEN # OMQ's Writable mixin already hands us frozen binary Strings # (frozen_binary + parts.freeze), so in the common case we # can stash the caller's reference without a `.b` copy. Only # coerce when the encoding/frozen invariants don't hold. sample = plaintext.frozen? && plaintext.encoding == Encoding::BINARY ? plaintext : plaintext.b @training_mutex.synchronize do return if @training_done @samples << sample @samples_bytes += plaintext.bytesize @samples_count += 1 maybe_train! end end |
#auto? ⇒ Boolean
Returns true for :dict_auto mode.
172 173 174 |
# File 'lib/omq/rfc/zstd/compression.rb', line 172 def auto? @mode == :dict_auto end |
#compress(plaintext) ⇒ Object
125 126 127 128 129 130 131 |
# File 'lib/omq/rfc/zstd/compression.rb', line 125 def compress(plaintext) if @send_dictionary @send_dictionary.compress(plaintext) else RZstd.compress(plaintext, level: @level) end end |
#decompress(compressed, max_output_size: nil) ⇒ Object
Bounded single-shot decompression. The ‘max_output_size:` cap is enforced inside the Rust extension: the frame’s Frame_Content_Size header is read first, and MissingContentSizeError / OutputSizeLimitError are raised before allocating the output buffer or invoking the decoder.
138 139 140 141 142 143 144 |
# File 'lib/omq/rfc/zstd/compression.rb', line 138 def decompress(compressed, max_output_size: nil) if @recv_dictionary @recv_dictionary.decompress(compressed, max_output_size: max_output_size) else RZstd.decompress(compressed, max_output_size: max_output_size) end end |
#has_recv_dictionary? ⇒ Boolean
106 107 108 |
# File 'lib/omq/rfc/zstd/compression.rb', line 106 def has_recv_dictionary? !@recv_dictionary.nil? end |
#has_send_dictionary? ⇒ Boolean
102 103 104 |
# File 'lib/omq/rfc/zstd/compression.rb', line 102 def has_send_dictionary? !@send_dictionary.nil? end |
#install_recv_dictionary(bytes) ⇒ Object
Install a dictionary into the recv slot. Called by the CompressionConnection wrapper when a ZDICT command frame arrives from the peer.
166 167 168 |
# File 'lib/omq/rfc/zstd/compression.rb', line 166 def install_recv_dictionary(bytes) @recv_dictionary = RZstd::Dictionary.new(bytes.b, level: @level) end |
#install_send_dictionary(bytes) ⇒ Object
Install a dictionary into the send slot. Used internally by auto-mode after training: the trained dict is installed here and the bytes stashed for shipping via ZDICT.
158 159 160 161 |
# File 'lib/omq/rfc/zstd/compression.rb', line 158 def install_send_dictionary(bytes) @send_dict_bytes = bytes.b @send_dictionary = RZstd::Dictionary.new(@send_dict_bytes, level: @level) end |
#match(peer_property_value) ⇒ Object
Match this compression’s advertised profile against a peer’s X-Compression property value (comma-separated profile list). Returns the matched profile string, or nil for no match.
149 150 151 152 153 |
# File 'lib/omq/rfc/zstd/compression.rb', line 149 def match(peer_property_value) return nil if peer_property_value.nil? || peer_property_value.empty? peer_profiles = peer_property_value.split(",").map(&:strip) peer_profiles.include?(@profile) ? @profile : nil end |
#min_compress_bytes ⇒ Object
120 121 122 123 |
# File 'lib/omq/rfc/zstd/compression.rb', line 120 def min_compress_bytes return Float::INFINITY if passive? has_send_dictionary? ? MIN_COMPRESS_BYTES_DICT : MIN_COMPRESS_BYTES_NO_DICT end |
#passive? ⇒ Boolean
True if this side was configured as a passive sender (RFC Sec. 6.4 “Passive senders”): advertise the profile and decompress incoming frames, but never compress outgoing frames. Implemented by making #min_compress_bytes return infinity so every outgoing part falls through to the SENTINEL_UNCOMPRESSED path in Codec.encode_part.
116 117 118 |
# File 'lib/omq/rfc/zstd/compression.rb', line 116 def passive? @passive == true end |
#trained? ⇒ Boolean
Returns true once auto-training has completed (success or give-up). After this point #add_sample is a no-op.
179 180 181 |
# File 'lib/omq/rfc/zstd/compression.rb', line 179 def trained? @training_done == true end |