Class: OMQ::Transport::ZstdTcp::ZstdConnection

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/omq/transport/zstd_tcp/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, codec) ⇒ ZstdConnection

Returns a new instance of ZstdConnection.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 14

def initialize(conn, codec)
  super(conn)
  @codec              = codec
  @dict_shipped       = false
  # rzstd 0.4: FrameCodec is the decoder. Starts no-dict; when a
  # dict shipment arrives on this direction, we build a fresh
  # dict-bound FrameCodec and replace this one. Level is a
  # compression parameter only — not consulted on decompress —
  # so we don't carry the send-side level through to recv.
  @recv_codec         = RZstd::FrameCodec.new
  @recv_dict_bytes    = nil
  @last_wire_size_in  = nil
end

Instance Attribute Details

#last_wire_size_inInteger? (readonly)

Returns wire bytesize of the last received message.

Returns:

  • (Integer, nil)

    wire bytesize of the last received message



11
12
13
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 11

def last_wire_size_in
  @last_wire_size_in
end

Instance Method Details

#receive_messageObject



50
51
52
53
54
55
56
57
58
59
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 50

def receive_message
  loop do
    parts   = __getobj__.receive_message
    decoded = decode_parts(parts)
    if decoded
      @last_wire_size_in = parts.sum { |p| p.bytesize }
      return decoded
    end
  end
end

#respond_to?(name, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


62
63
64
65
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 62

def respond_to?(name, include_private = false)
  return false if name == :write_wire
  super
end

#send_message(parts) ⇒ Object



29
30
31
32
33
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 29

def send_message(parts)
  compressed = @codec.compress_parts(parts)
  ship_dict!
  __getobj__.send_message(compressed)
end

#write_message(parts) ⇒ Object



36
37
38
39
40
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 36

def write_message(parts)
  compressed = @codec.compress_parts(parts)
  ship_dict!
  __getobj__.write_message(compressed)
end

#write_messages(messages) ⇒ Object



43
44
45
46
47
# File 'lib/omq/transport/zstd_tcp/connection.rb', line 43

def write_messages(messages)
  compressed = messages.map { |parts| @codec.compress_parts(parts) }
  ship_dict!
  __getobj__.write_messages(compressed)
end