Class: Ignis::Collective::Transport::TCPTransport

Inherits:
Base
  • Object
show all
Defined in:
lib/nvruby/collective/transport/tcp_transport.rb

Overview

Note:

This is the lowest-performance multi-node option but works on any network configuration

TCP Fallback Transport Generic TCP transport for multi-node communication Used when higher-performance transports are unavailable

Examples:

Create TCP transport

transport = TCPTransport.new(
  local_addr: "192.168.1.100",
  local_port: 50000,
  remote_addr: "192.168.1.101",
  remote_port: 50000,
  mode: :client  # or :server
)

Constant Summary collapse

DEFAULT_BUFFER_SIZE =

Default buffer size for staging

16 * 1024 * 1024
CONNECT_TIMEOUT =

Connection timeout in seconds

30

Instance Attribute Summary collapse

Attributes inherited from Base

#dst_device, #src_device

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

available?, #estimated_latency, #recv_async, #recv_sync, #send_async, #send_sync, #synchronize!

Constructor Details

#initialize(local_addr:, local_port:, remote_addr:, remote_port:, mode: :client, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ TCPTransport

Returns a new instance of TCPTransport.

Parameters:

  • local_addr (String)

    Local IP address

  • local_port (Integer)

    Local port

  • remote_addr (String)

    Remote IP address

  • remote_port (Integer)

    Remote port

  • mode (Symbol) (defaults to: :client)

    :client or :server

  • buffer_size (Integer) (defaults to: DEFAULT_BUFFER_SIZE)

    Staging buffer size



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 58

def initialize(local_addr:, local_port:, remote_addr:, remote_port:,
               mode: :client, buffer_size: DEFAULT_BUFFER_SIZE)
  super(src_device: 0, dst_device: 0)
  @local_addr = local_addr
  @local_port = local_port
  @remote_addr = remote_addr
  @remote_port = remote_port
  @mode = mode
  @buffer_size = buffer_size
  @socket = nil
  @server_socket = nil
  @send_buffer = nil
  @recv_buffer = nil
  @initialized = false
  @mutex = Mutex.new
end

Instance Attribute Details

#local_addrString (readonly)

Returns Local address.

Returns:

  • (String)

    Local address



33
34
35
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 33

def local_addr
  @local_addr
end

#local_portInteger (readonly)

Returns Local port.

Returns:

  • (Integer)

    Local port



36
37
38
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 36

def local_port
  @local_port
end

#modeSymbol (readonly)

Returns Mode (:client or :server).

Returns:

  • (Symbol)

    Mode (:client or :server)



45
46
47
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 45

def mode
  @mode
end

#remote_addrString (readonly)

Returns Remote address.

Returns:

  • (String)

    Remote address



39
40
41
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 39

def remote_addr
  @remote_addr
end

#remote_portInteger (readonly)

Returns Remote port.

Returns:

  • (Integer)

    Remote port



42
43
44
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 42

def remote_port
  @remote_port
end

Class Method Details

.transport_typeSymbol

Returns Transport type.

Returns:

  • (Symbol)

    Transport type



48
49
50
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 48

def self.transport_type
  :tcp
end

Instance Method Details

#destroy!void

This method returns an undefined value.

Clean up resources



182
183
184
185
186
187
188
189
190
191
192
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 182

def destroy!
  return unless @initialized

  @socket&.close
  @server_socket&.close
  free_buffers!

  @socket = nil
  @server_socket = nil
  @initialized = false
end

#estimated_bandwidthFloat

Estimated bandwidth in GB/s

Returns:

  • (Float)


176
177
178
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 176

def estimated_bandwidth
  1.25  # ~10 Gbps typical for TCP
end

#initialize!void

This method returns an undefined value.

Initialize TCP transport



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 77

def initialize!
  return if @initialized

  allocate_buffers!

  if @mode == :server
    start_server!
  else
    connect_to_server!
  end

  @initialized = true
end

#ready?Boolean

Check if ready

Returns:

  • (Boolean)


93
94
95
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 93

def ready?
  @initialized && @socket && !@socket.closed?
end

#recv(dst_ptr, size, stream: nil) ⇒ Integer

Receive data from remote

Parameters:

  • dst_ptr (FFI::Pointer)

    Destination GPU buffer pointer

  • size (Integer)

    Expected size in bytes

  • stream (FFI::Pointer, nil) (defaults to: nil)

    CUDA stream

Returns:

  • (Integer)

    Bytes received



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 141

def recv(dst_ptr, size, stream: nil)
  ensure_initialized!
  @mutex.synchronize do
    # Receive size header
    actual_size = recv_header

    # The header size comes from the peer; a malformed/hostile peer could
    # announce a size larger than the staging buffer, and put_bytes below
    # would then write past it (heap overflow). Bound it.
    if actual_size > @buffer_size
      raise TransportError,
            "TCP recv header announces #{actual_size} bytes, exceeding the " \
            "#{@buffer_size}-byte staging buffer (refusing to overflow)"
    end

    # Receive data in chunks
    bytes_received = 0
    while bytes_received < actual_size
      chunk_size = [actual_size - bytes_received, @buffer_size].min
      data = @socket.recv(chunk_size)
      raise TransportError, "TCP recv failed" if data.nil? || data.empty?

      @recv_buffer.put_bytes(bytes_received, data)
      bytes_received += data.bytesize
    end

    # Copy from host buffer to GPU
    unstage_from_host(dst_ptr, actual_size, stream)

    actual_size
  end
end

#send(src_ptr, size, stream: nil) ⇒ Boolean

Send data to remote

Parameters:

  • src_ptr (FFI::Pointer)

    Source GPU buffer pointer

  • size (Integer)

    Size in bytes

  • stream (FFI::Pointer, nil) (defaults to: nil)

    CUDA stream

Returns:

  • (Boolean)

    Success



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 103

def send(src_ptr, size, stream: nil)
  ensure_initialized!
  # Guard against overflowing the fixed-size host staging buffer:
  # stage_to_host copies `size` bytes into the @buffer_size buffer, so a
  # larger transfer corrupts the heap. Chunked staging for >buffer_size
  # transfers isn't implemented yet — fail loudly instead of overflowing.
  if size > @buffer_size
    raise TransportError,
          "TCP transfer of #{size} bytes exceeds the #{@buffer_size}-byte host staging " \
          "buffer (chunked staging not implemented); increase buffer_size or chunk the transfer"
  end
  @mutex.synchronize do
    # Stage GPU data to host buffer
    stage_to_host(src_ptr, size, stream)

    # Send size header
    send_header(size)

    # Send data in chunks
    bytes_sent = 0
    while bytes_sent < size
      chunk_size = [size - bytes_sent, @buffer_size].min
      sent = @socket.send(@send_buffer.get_bytes(bytes_sent, chunk_size), 0)
      raise TransportError, "TCP send failed" if sent <= 0

      bytes_sent += sent
    end

    true
  end
end

#to_sString

Returns:

  • (String)


195
196
197
198
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 195

def to_s
  status = ready? ? "connected" : "disconnected"
  "TCPTransport[#{@local_addr}:#{@local_port} <-> #{@remote_addr}:#{@remote_port}, #{status}]"
end