Class: NNQ::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/connection.rb

Overview

Per-pipe state: thin wrapper around Protocol::SP::Connection.

Owns no fibers itself — recv loop and send pump are spawned by the Engine and routing strategy respectively.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sp, endpoint: nil) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • sp (Protocol::SP::Connection)

    handshake-completed SP connection

  • endpoint (String, nil) (defaults to: nil)


22
23
24
25
26
# File 'lib/nnq/connection.rb', line 22

def initialize(sp, endpoint: nil)
  @sp       = sp
  @endpoint = endpoint
  @closed   = false
end

Instance Attribute Details

#endpointString? (readonly)

Returns endpoint URI we connected to / accepted from.

Returns:

  • (String, nil)

    endpoint URI we connected to / accepted from



17
18
19
# File 'lib/nnq/connection.rb', line 17

def endpoint
  @endpoint
end

#spProtocol::SP::Connection (readonly)

Returns:

  • (Protocol::SP::Connection)


13
14
15
# File 'lib/nnq/connection.rb', line 13

def sp
  @sp
end

Instance Method Details

#closeObject

Closes the underlying SP connection. Safe to call twice.



93
94
95
96
97
# File 'lib/nnq/connection.rb', line 93

def close
  return if @closed
  @closed = true
  @sp.close
end

#closed?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/nnq/connection.rb', line 87

def closed?
  @closed
end

#flushvoid

This method returns an undefined value.

Flushes the SP connection’s send buffer to the socket.



73
74
75
# File 'lib/nnq/connection.rb', line 73

def flush
  @sp.flush
end

#peer_protocolInteger

Returns peer protocol id (e.g. Protocols::PULL_V0).

Returns:

  • (Integer)

    peer protocol id (e.g. Protocols::PULL_V0)



30
31
32
# File 'lib/nnq/connection.rb', line 30

def peer_protocol
  @sp.peer_protocol
end

#receive_messageString

Reads one message body off the wire. Blocks the calling fiber.

Returns:

  • (String)


81
82
83
# File 'lib/nnq/connection.rb', line 81

def receive_message
  @sp.receive_message
end

#send_message(body, header: nil) ⇒ void

This method returns an undefined value.

Writes one message AND flushes immediately. Used by REQ/REP where each call is request-paced and there’s nothing to batch.

Parameters:

  • body (String)
  • header (String, nil) (defaults to: nil)

    optional binary prefix

Raises:



64
65
66
67
# File 'lib/nnq/connection.rb', line 64

def send_message(body, header: nil)
  raise ClosedError, "connection closed" if @closed
  @sp.send_message(body, header: header)
end

#write_message(body, header: nil) ⇒ void

This method returns an undefined value.

Writes one message into the SP connection’s send buffer (no flush).

Parameters:

  • body (String)
  • header (String, nil) (defaults to: nil)

    optional binary prefix written between the SP length prefix and body (see Protocol::SP::Connection)

Raises:



41
42
43
44
# File 'lib/nnq/connection.rb', line 41

def write_message(body, header: nil)
  raise ClosedError, "connection closed" if @closed
  @sp.write_message(body, header: header)
end

#write_messages(bodies) ⇒ void

This method returns an undefined value.

Writes a batch of bodies under a single SP mutex acquisition. Used by the work-stealing send pump hot path.

Parameters:

  • bodies (Array<String>)

Raises:



52
53
54
55
# File 'lib/nnq/connection.rb', line 52

def write_messages(bodies)
  raise ClosedError, "connection closed" if @closed
  @sp.write_messages(bodies)
end