Class: NNQ::Transport::ZstdTcp::ZstdConnection

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/nnq/transport/zstd_tcp/connection.rb

Overview

Wraps an NNQ::Connection, transparently compressing every outbound body through the engine-scoped Codec and decompressing every inbound wire message. Dict frames are shipped in-band (first call after training) and silently installed on receive.

Instance Method Summary collapse

Constructor Details

#initialize(conn, codec) ⇒ ZstdConnection

Returns a new instance of ZstdConnection.



13
14
15
16
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 13

def initialize(conn, codec)
  super(conn)
  @codec = codec
end

Instance Method Details

#receive_messageObject

Loops until a real payload arrives. Dict frames are installed silently and discarded so the caller only sees plaintext.



48
49
50
51
52
53
54
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 48

def receive_message
  loop do
    wire    = __getobj__.receive_message
    decoded = @codec.decode(wire)
    return decoded unless decoded.nil?
  end
end

#send_message(body, header: nil) ⇒ Object



27
28
29
30
31
32
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 27

def send_message(body, header: nil)
  combined = header ? (header + body) : body
  wire, dict_frame = @codec.encode(combined)
  __getobj__.write_message(dict_frame) if dict_frame
  __getobj__.send_message(wire)
end

#write_message(body, header: nil) ⇒ Object



19
20
21
22
23
24
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 19

def write_message(body, header: nil)
  combined = header ? (header + body) : body
  wire, dict_frame = @codec.encode(combined)
  __getobj__.write_message(dict_frame) if dict_frame
  __getobj__.write_message(wire)
end

#write_messages(bodies) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/nnq/transport/zstd_tcp/connection.rb', line 35

def write_messages(bodies)
  batch = []
  bodies.each do |body|
    wire, dict_frame = @codec.encode(body)
    batch << dict_frame if dict_frame
    batch << wire
  end
  __getobj__.write_messages(batch)
end