Class: OMQ::Transport::Lz4Tcp::Lz4Connection

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/omq/transport/lz4_tcp/connection.rb

Overview

Per-connection state + encode/decode hooks. A SimpleDelegator over the ZMTP connection so send_message / write_message / receive_message route through compression, but everything else (write_command, close, etc.) passes through untouched.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, send_dict_bytes:, max_message_size:) ⇒ Lz4Connection

Returns a new instance of Lz4Connection.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 19

def initialize(conn, send_dict_bytes:, max_message_size:)
  super(conn)
  @max_message_size  = max_message_size

  # Per-direction state. The send codec is built at construction
  # with the send-side dictionary (if any) baked in. The receive
  # codec starts dict-less; if/when a dict shipment arrives on
  # this direction, it is replaced with a dict-bound codec.
  @send_dict_bytes   = send_dict_bytes&.b
  @send_codec        = build_block_codec(@send_dict_bytes)
  @send_dict_shipped = @send_dict_bytes.nil?  # nothing to ship => "already shipped"

  @recv_codec        = build_block_codec(nil)
  @recv_dict_bytes   = nil

  @last_wire_size_in = nil
end

Instance Attribute Details

#last_wire_size_inInteger? (readonly)

Returns wire bytesize of the last received message (sum across parts, compressed sentinel included).

Returns:

  • (Integer, nil)

    wire bytesize of the last received message (sum across parts, compressed sentinel included).



17
18
19
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 17

def last_wire_size_in
  @last_wire_size_in
end

Instance Method Details

#receive_messageObject



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 59

def receive_message
  # Loop: a dict shipment is consumed silently and we read the
  # next ZMTP message. Only data messages are returned to the
  # caller. Budget tracking happens inside decode_wire_parts.
  loop do
    parts   = __getobj__.receive_message
    decoded = decode_wire_parts(parts)
    if decoded
      @last_wire_size_in = parts.sum(&:bytesize)
      return decoded
    end
  end
end

#send_message(parts) ⇒ Object



38
39
40
41
42
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 38

def send_message(parts)
  wire = encode_parts(parts)
  ship_send_dict!
  __getobj__.send_message(wire)
end

#write_message(parts) ⇒ Object



45
46
47
48
49
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 45

def write_message(parts)
  wire = encode_parts(parts)
  ship_send_dict!
  __getobj__.write_message(wire)
end

#write_messages(messages) ⇒ Object



52
53
54
55
56
# File 'lib/omq/transport/lz4_tcp/connection.rb', line 52

def write_messages(messages)
  wires = messages.map { |parts| encode_parts(parts) }
  ship_send_dict!
  __getobj__.write_messages(wires)
end