Class: NNQ::Zstd::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/zstd/wrapper.rb

Overview

Socket decorator that transparently runs every outbound body through the Codec’s encoder and every inbound wire message through its decoder. Delegates any unknown method to the wrapped socket, so the wrapper quacks like an ‘NNQ::Socket`.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, level:, dicts:) ⇒ Wrapper

Returns a new instance of Wrapper.



13
14
15
16
17
18
19
20
21
# File 'lib/nnq/zstd/wrapper.rb', line 13

def initialize(socket, level:, dicts:)
  @sock  = socket
  @codec = Codec.new(
    level:         level,
    dicts:         dicts,
    recv_max_size: recv_max_size_from(socket),
  )
  start_dict_monitor!
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, **kwargs, &block) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/nnq/zstd/wrapper.rb', line 83

def method_missing(name, *args, **kwargs, &block)
  if @sock.respond_to?(name)
    @sock.public_send(name, *args, **kwargs, &block)
  else
    super
  end
end

Instance Attribute Details

#codecObject (readonly)

Returns the value of attribute codec.



10
11
12
# File 'lib/nnq/zstd/wrapper.rb', line 10

def codec
  @codec
end

Instance Method Details

#closeObject



68
69
70
71
72
73
74
75
# File 'lib/nnq/zstd/wrapper.rb', line 68

def close
  begin
    @monitor_task&.stop
  rescue StandardError
    # Monitor task may already be gone; fine.
  end
  @sock.close
end

#receiveObject

Loops internally until a real payload arrives or the socket closes. Dict frames are silently installed and discarded.



58
59
60
61
62
63
64
65
# File 'lib/nnq/zstd/wrapper.rb', line 58

def receive
  loop do
    raw = @sock.receive
    return raw if raw.nil?
    decoded = @codec.decode(raw)
    return decoded unless decoded.nil?
  end
end

#respond_to_missing?(name, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/nnq/zstd/wrapper.rb', line 78

def respond_to_missing?(name, include_private = false)
  @sock.respond_to?(name, include_private)
end

#send(body) ⇒ Object



24
25
26
# File 'lib/nnq/zstd/wrapper.rb', line 24

def send(body)
  send_with_codec(body) { |wire| @sock.send(wire) }
end

#send_reply(body) ⇒ Object



29
30
31
# File 'lib/nnq/zstd/wrapper.rb', line 29

def send_reply(body)
  send_with_codec(body) { |wire| @sock.send_reply(wire) }
end

#send_request(body) ⇒ Object

REQ is cooked: send_request sends and returns the matching reply body. We must decode that reply through the codec too, otherwise the caller sees the raw wire (NUL preamble or zstd magic) instead of the plaintext.



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/nnq/zstd/wrapper.rb', line 43

def send_request(body)
  wire, dict_frames = @codec.encode(body)
  # Dict frames can't be interleaved on a cooked REQ (strict
  # alternation), so ship them as separate one-shot requests.
  # The peer installs them and replies with an empty body which
  # we discard.
  dict_frames.each { |df| @sock.send_request(df) }
  reply = @sock.send_request(wire)
  return nil if reply.nil?
  @codec.decode(reply)
end

#send_survey(body) ⇒ Object



34
35
36
# File 'lib/nnq/zstd/wrapper.rb', line 34

def send_survey(body)
  send_with_codec(body) { |wire| @sock.send_survey(wire) }
end