Class: NNQ::Transport::ZstdTcp::ZstdConnection

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/nnq/transport/zstd_tcp/connection.rb

Overview

Wraps an NNQ::Connection, transparently compressing every outbound body through the engine-scoped Codec and decompressing every inbound wire message. Dict frames are shipped in-band (first call after training) and silently installed on receive.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, codec) ⇒ ZstdConnection

Returns a new instance of ZstdConnection.



21
22
23
24
25
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 21

def initialize(conn, codec)
  super(conn)
  @codec             = codec
  @last_wire_size_in = nil
end

Instance Attribute Details

#last_wire_size_inInteger? (readonly)

Returns compressed byte count of the last payload frame decoded by #receive_message. Ignores dict-only frames. Read by the engine’s recv loop to attach wire_size: to :message_received verbose monitor events. Name mirrors omq-zstd’s analogous hook.

Returns:

  • (Integer, nil)

    compressed byte count of the last payload frame decoded by #receive_message. Ignores dict-only frames. Read by the engine’s recv loop to attach wire_size: to :message_received verbose monitor events. Name mirrors omq-zstd’s analogous hook.



18
19
20
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 18

def last_wire_size_in
  @last_wire_size_in
end

Instance Method Details

#receive_messageObject

Loops until a real payload arrives. Dict frames are installed silently and discarded so the caller only sees plaintext.



57
58
59
60
61
62
63
64
65
66
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 57

def receive_message
  loop do
    wire    = __getobj__.receive_message
    decoded = @codec.decode(wire)
    next if decoded.nil?

    @last_wire_size_in = wire.bytesize
    return decoded
  end
end

#send_message(body, header: nil) ⇒ Object



36
37
38
39
40
41
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 36

def send_message(body, header: nil)
  combined = header ? (header + body) : body
  wire, dict_frame = @codec.encode(combined)
  __getobj__.write_message(dict_frame) if dict_frame
  __getobj__.send_message(wire)
end

#write_message(body, header: nil) ⇒ Object



28
29
30
31
32
33
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 28

def write_message(body, header: nil)
  combined = header ? (header + body) : body
  wire, dict_frame = @codec.encode(combined)
  __getobj__.write_message(dict_frame) if dict_frame
  __getobj__.write_message(wire)
end

#write_messages(bodies) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 44

def write_messages(bodies)
  batch = []
  bodies.each do |body|
    wire, dict_frame = @codec.encode(body)
    batch << dict_frame if dict_frame
    batch << wire
  end
  __getobj__.write_messages(batch)
end