Class: NNQ::Zstd::Codec
- Inherits:
-
Object
- Object
- NNQ::Zstd::Codec
- Defined in:
- lib/nnq/zstd/codec.rb
Overview
Pure state machine: no socket references. ‘encode(body)` returns `[wire, dict_frame]` where `dict_frame` is a single dict message that MUST precede `wire` on the wire, or nil if no dict needs shipping. `decode(wire)` returns a plaintext String, or `nil` if the wire message was a dict frame that has been silently installed into the receive-side slot.
Constant Summary collapse
- MAX_DECOMPRESSED_SIZE =
16 * 1024 * 1024
- MAX_DICT_SIZE =
32 * 1024
- DICT_CAPACITY =
8 * 1024
- TRAIN_MAX_SAMPLES =
1000- TRAIN_MAX_BYTES =
100 * 1024
- TRAIN_MAX_SAMPLE_LEN =
1024- MIN_COMPRESS_NO_DICT =
512- MIN_COMPRESS_WITH_DICT =
64- NUL_PREAMBLE =
("\x00" * 4).b.freeze
- ZSTD_MAGIC =
"\x28\xB5\x2F\xFD".b.freeze
- ZDICT_MAGIC =
"\x37\xA4\x30\xEC".b.freeze
Instance Method Summary collapse
-
#decode(wire) ⇒ Object
Decodes a wire message.
-
#encode(body) ⇒ Object
Encodes ‘body` into a wire message.
-
#initialize(level:, dict: nil, recv_max_size: nil) ⇒ Codec
constructor
A new instance of Codec.
-
#reset_for_reconnect! ⇒ Object
Resets send-side state after a reconnect so the dict is re-shipped on the new connection.
Constructor Details
#initialize(level:, dict: nil, recv_max_size: nil) ⇒ Codec
Returns a new instance of Codec.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/nnq/zstd/codec.rb', line 26 def initialize(level:, dict: nil, recv_max_size: nil) @level = level @recv_max_size = [recv_max_size || MAX_DECOMPRESSED_SIZE, MAX_DECOMPRESSED_SIZE].min @send_dict = nil @send_dict_bytes = nil @recv_dict = nil @dict_shipped = false @training = dict.nil? @train_samples = [] @train_bytes = 0 install_send_dict(dict.b) if dict end |
Instance Method Details
#decode(wire) ⇒ Object
Decodes a wire message. Returns the plaintext String, or ‘nil` if this message was a dict frame (installed into the receive slot, not surfaced to the caller).
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nnq/zstd/codec.rb', line 68 def decode(wire) raise ProtocolError, "wire message too short" if wire.bytesize < 4 head = wire.byteslice(0, 4) case head when NUL_PREAMBLE wire.byteslice(4, wire.bytesize - 4) || "".b when ZSTD_MAGIC decode_zstd_frame(wire) when ZDICT_MAGIC install_recv_dict(wire) nil else raise ProtocolError, "unrecognized preamble: #{head.unpack1('H*')}" end end |
#encode(body) ⇒ Object
Encodes ‘body` into a wire message. Returns `[wire, dict_frame]` where `dict_frame` is a wire message that MUST be sent strictly before `wire`, or nil.
55 56 57 58 59 60 61 62 |
# File 'lib/nnq/zstd/codec.rb', line 55 def encode(body) body = body.b maybe_train!(body) dict_frame = pending_dict_frame wire = compress_or_plain(body) [wire, dict_frame] end |
#reset_for_reconnect! ⇒ Object
Resets send-side state after a reconnect so the dict is re-shipped on the new connection. Does NOT clear @recv_dict —the new peer’s dict overwrites it naturally when it arrives, and clearing it here would race with the monitor event.
47 48 49 |
# File 'lib/nnq/zstd/codec.rb', line 47 def reset_for_reconnect! @dict_shipped = false end |