Class: NNQ::Zstd::Codec

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/zstd/codec.rb

Overview

Pure state machine: no socket references. ‘encode(body)` returns `[wire, dict_frames]` where `dict_frames` are any dict frames that MUST precede the wire payload on the wire. `decode(wire)` returns a plaintext String, or `nil` if the wire message was a dict frame that has been silently installed into the receive- side dictionary store.

Constant Summary collapse

MAX_DECOMPRESSED_SIZE =
16 * 1024 * 1024
MAX_DICTS =
32
MAX_DICTS_TOTAL_BYTES =
128 * 1024
DICT_CAPACITY =
8 * 1024
TRAIN_MAX_SAMPLES =
1000
TRAIN_MAX_BYTES =
100 * 1024
TRAIN_MAX_SAMPLE_LEN =
1024
MIN_COMPRESS_NO_DICT =
512
MIN_COMPRESS_WITH_DICT =
64
NUL_PREAMBLE =
("\x00" * 4).b.freeze
ZSTD_MAGIC =
"\x28\xB5\x2F\xFD".b.freeze
ZDICT_MAGIC =
"\x37\xA4\x30\xEC".b.freeze

Instance Method Summary collapse

Constructor Details

#initialize(level:, dicts: [], recv_max_size: nil) ⇒ Codec

Returns a new instance of Codec.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/nnq/zstd/codec.rb', line 27

def initialize(level:, dicts: [], recv_max_size: nil)
  @level           = level
  @recv_max_size   = [recv_max_size || MAX_DECOMPRESSED_SIZE, MAX_DECOMPRESSED_SIZE].min

  @send_dicts      = {}
  @send_dict_bytes = {}
  @send_dict_order = []
  @pending_ship    = []
  @shipped_peers   = Set.new
  @active_send_id  = nil

  @recv_dicts      = {}
  @recv_total_bytes = 0

  @training        = dicts.empty?
  @train_samples   = []
  @train_bytes     = 0

  dicts.each { |db| install_send_dict(db.b) }
end

Instance Method Details

#active_send_dict_idInteger?

Returns id of the dict currently used for compression.

Returns:

  • (Integer, nil)

    id of the dict currently used for compression



50
51
52
# File 'lib/nnq/zstd/codec.rb', line 50

def active_send_dict_id
  @active_send_id
end

#decode(wire) ⇒ Object

Decodes a wire message. Returns the plaintext String, or ‘nil` if this message was a dict frame (installed into the receive store, not surfaced to the caller).

Raises:



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/nnq/zstd/codec.rb', line 91

def decode(wire)
  raise ProtocolError, "wire message too short" if wire.bytesize < 4
  head = wire.byteslice(0, 4)
  case head
  when NUL_PREAMBLE
    wire.byteslice(4, wire.bytesize - 4) || "".b
  when ZSTD_MAGIC
    decode_zstd_frame(wire)
  when ZDICT_MAGIC
    install_recv_dict(wire)
    nil
  else
    raise ProtocolError, "unrecognized preamble: #{head.unpack1('H*')}"
  end
end

#encode(body) ⇒ Object

Encodes ‘body` into a wire message. Returns `[wire, dict_frames]` where `dict_frames` is an array of wire messages that MUST be sent strictly before `wire`.



78
79
80
81
82
83
84
85
# File 'lib/nnq/zstd/codec.rb', line 78

def encode(body)
  body = body.b
  maybe_train!(body)

  dict_frames = drain_pending_dict_frames
  wire = compress_or_plain(body)
  [wire, dict_frames]
end

#recv_dict_idsArray<Integer>

Returns ids of dicts in the recv-side store.

Returns:

  • (Array<Integer>)

    ids of dicts in the recv-side store



62
63
64
# File 'lib/nnq/zstd/codec.rb', line 62

def recv_dict_ids
  @recv_dicts.keys
end

#requeue_all_dicts_for_shipping!Object

Resets the shipped-tracker so the next encode calls will re-emit every known dict before the next real payload. Called by the wrapper when a new peer connects.



70
71
72
# File 'lib/nnq/zstd/codec.rb', line 70

def requeue_all_dicts_for_shipping!
  @pending_ship = @send_dict_order.dup
end

#send_dict_idsArray<Integer>

Returns ids of dicts in the send-side store.

Returns:

  • (Array<Integer>)

    ids of dicts in the send-side store



56
57
58
# File 'lib/nnq/zstd/codec.rb', line 56

def send_dict_ids
  @send_dict_order.dup
end