Class: NNQ::Zstd::Codec
- Inherits:
-
Object
- Object
- NNQ::Zstd::Codec
- Defined in:
- lib/nnq/zstd/codec.rb
Overview
Pure state machine: no socket references. ‘encode(body)` returns `[wire, dict_frames]` where `dict_frames` are any dict frames that MUST precede the wire payload on the wire. `decode(wire)` returns a plaintext String, or `nil` if the wire message was a dict frame that has been silently installed into the receive- side dictionary store.
Constant Summary collapse
- MAX_DECOMPRESSED_SIZE =
16 * 1024 * 1024
- MAX_DICTS =
32- MAX_DICTS_TOTAL_BYTES =
128 * 1024
- DICT_CAPACITY =
8 * 1024
- TRAIN_MAX_SAMPLES =
1000- TRAIN_MAX_BYTES =
100 * 1024
- TRAIN_MAX_SAMPLE_LEN =
1024- MIN_COMPRESS_NO_DICT =
512- MIN_COMPRESS_WITH_DICT =
64- NUL_PREAMBLE =
("\x00" * 4).b.freeze
- ZSTD_MAGIC =
"\x28\xB5\x2F\xFD".b.freeze
- ZDICT_MAGIC =
"\x37\xA4\x30\xEC".b.freeze
Instance Method Summary collapse
-
#active_send_dict_id ⇒ Integer?
Id of the dict currently used for compression.
-
#decode(wire) ⇒ Object
Decodes a wire message.
-
#encode(body) ⇒ Object
Encodes ‘body` into a wire message.
-
#initialize(level:, dicts: [], recv_max_size: nil) ⇒ Codec
constructor
A new instance of Codec.
-
#recv_dict_ids ⇒ Array<Integer>
Ids of dicts in the recv-side store.
-
#requeue_all_dicts_for_shipping! ⇒ Object
Resets the shipped-tracker so the next encode calls will re-emit every known dict before the next real payload.
-
#send_dict_ids ⇒ Array<Integer>
Ids of dicts in the send-side store.
Constructor Details
#initialize(level:, dicts: [], recv_max_size: nil) ⇒ Codec
Returns a new instance of Codec.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/nnq/zstd/codec.rb', line 27 def initialize(level:, dicts: [], recv_max_size: nil) @level = level @recv_max_size = [recv_max_size || MAX_DECOMPRESSED_SIZE, MAX_DECOMPRESSED_SIZE].min @send_dicts = {} @send_dict_bytes = {} @send_dict_order = [] @pending_ship = [] @shipped_peers = Set.new @active_send_id = nil @recv_dicts = {} @recv_total_bytes = 0 @training = dicts.empty? @train_samples = [] @train_bytes = 0 dicts.each { |db| install_send_dict(db.b) } end |
Instance Method Details
#active_send_dict_id ⇒ Integer?
Returns id of the dict currently used for compression.
50 51 52 |
# File 'lib/nnq/zstd/codec.rb', line 50 def active_send_dict_id @active_send_id end |
#decode(wire) ⇒ Object
Decodes a wire message. Returns the plaintext String, or ‘nil` if this message was a dict frame (installed into the receive store, not surfaced to the caller).
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/nnq/zstd/codec.rb', line 91 def decode(wire) raise ProtocolError, "wire message too short" if wire.bytesize < 4 head = wire.byteslice(0, 4) case head when NUL_PREAMBLE wire.byteslice(4, wire.bytesize - 4) || "".b when ZSTD_MAGIC decode_zstd_frame(wire) when ZDICT_MAGIC install_recv_dict(wire) nil else raise ProtocolError, "unrecognized preamble: #{head.unpack1('H*')}" end end |
#encode(body) ⇒ Object
Encodes ‘body` into a wire message. Returns `[wire, dict_frames]` where `dict_frames` is an array of wire messages that MUST be sent strictly before `wire`.
78 79 80 81 82 83 84 85 |
# File 'lib/nnq/zstd/codec.rb', line 78 def encode(body) body = body.b maybe_train!(body) dict_frames = drain_pending_dict_frames wire = compress_or_plain(body) [wire, dict_frames] end |
#recv_dict_ids ⇒ Array<Integer>
Returns ids of dicts in the recv-side store.
62 63 64 |
# File 'lib/nnq/zstd/codec.rb', line 62 def recv_dict_ids @recv_dicts.keys end |
#requeue_all_dicts_for_shipping! ⇒ Object
Resets the shipped-tracker so the next encode calls will re-emit every known dict before the next real payload. Called by the wrapper when a new peer connects.
70 71 72 |
# File 'lib/nnq/zstd/codec.rb', line 70 def requeue_all_dicts_for_shipping! @pending_ship = @send_dict_order.dup end |
#send_dict_ids ⇒ Array<Integer>
Returns ids of dicts in the send-side store.
56 57 58 |
# File 'lib/nnq/zstd/codec.rb', line 56 def send_dict_ids @send_dict_order.dup end |