Class: Ignis::Collective::Transport::TCPTransport
- Defined in:
- lib/nvruby/collective/transport/tcp_transport.rb
Overview
This is the lowest-performance multi-node option but works on any network configuration
TCP Fallback Transport Generic TCP transport for multi-node communication Used when higher-performance transports are unavailable
Constant Summary collapse
- DEFAULT_BUFFER_SIZE =
Default buffer size for staging
16 * 1024 * 1024
- CONNECT_TIMEOUT =
Connection timeout in seconds
30
Instance Attribute Summary collapse
-
#local_addr ⇒ String
readonly
Local address.
-
#local_port ⇒ Integer
readonly
Local port.
-
#mode ⇒ Symbol
readonly
Mode (:client or :server).
-
#remote_addr ⇒ String
readonly
Remote address.
-
#remote_port ⇒ Integer
readonly
Remote port.
Attributes inherited from Base
Class Method Summary collapse
-
.transport_type ⇒ Symbol
Transport type.
Instance Method Summary collapse
-
#destroy! ⇒ void
Clean up resources.
-
#estimated_bandwidth ⇒ Float
Estimated bandwidth in GB/s.
-
#initialize(local_addr:, local_port:, remote_addr:, remote_port:, mode: :client, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ TCPTransport
constructor
A new instance of TCPTransport.
-
#initialize! ⇒ void
Initialize TCP transport.
-
#ready? ⇒ Boolean
Check if ready.
-
#recv(dst_ptr, size, stream: nil) ⇒ Integer
Receive data from remote.
-
#send(src_ptr, size, stream: nil) ⇒ Boolean
Send data to remote.
- #to_s ⇒ String
Methods inherited from Base
available?, #estimated_latency, #recv_async, #recv_sync, #send_async, #send_sync, #synchronize!
Constructor Details
#initialize(local_addr:, local_port:, remote_addr:, remote_port:, mode: :client, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ TCPTransport
Returns a new instance of TCPTransport.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 58 def initialize(local_addr:, local_port:, remote_addr:, remote_port:, mode: :client, buffer_size: DEFAULT_BUFFER_SIZE) super(src_device: 0, dst_device: 0) @local_addr = local_addr @local_port = local_port @remote_addr = remote_addr @remote_port = remote_port @mode = mode @buffer_size = buffer_size @socket = nil @server_socket = nil @send_buffer = nil @recv_buffer = nil @initialized = false @mutex = Mutex.new end |
Instance Attribute Details
#local_addr ⇒ String (readonly)
Returns Local address.
33 34 35 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 33 def local_addr @local_addr end |
#local_port ⇒ Integer (readonly)
Returns Local port.
36 37 38 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 36 def local_port @local_port end |
#mode ⇒ Symbol (readonly)
Returns Mode (:client or :server).
45 46 47 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 45 def mode @mode end |
#remote_addr ⇒ String (readonly)
Returns Remote address.
39 40 41 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 39 def remote_addr @remote_addr end |
#remote_port ⇒ Integer (readonly)
Returns Remote port.
42 43 44 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 42 def remote_port @remote_port end |
Class Method Details
.transport_type ⇒ Symbol
Returns Transport type.
48 49 50 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 48 def self.transport_type :tcp end |
Instance Method Details
#destroy! ⇒ void
This method returns an undefined value.
Clean up resources
182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 182 def destroy! return unless @initialized @socket&.close @server_socket&.close free_buffers! @socket = nil @server_socket = nil @initialized = false end |
#estimated_bandwidth ⇒ Float
Estimated bandwidth in GB/s
176 177 178 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 176 def estimated_bandwidth 1.25 # ~10 Gbps typical for TCP end |
#initialize! ⇒ void
This method returns an undefined value.
Initialize TCP transport
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 77 def initialize! return if @initialized allocate_buffers! if @mode == :server start_server! else connect_to_server! end @initialized = true end |
#ready? ⇒ Boolean
Check if ready
93 94 95 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 93 def ready? @initialized && @socket && !@socket.closed? end |
#recv(dst_ptr, size, stream: nil) ⇒ Integer
Receive data from remote
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 141 def recv(dst_ptr, size, stream: nil) ensure_initialized! @mutex.synchronize do # Receive size header actual_size = recv_header # The header size comes from the peer; a malformed/hostile peer could # announce a size larger than the staging buffer, and put_bytes below # would then write past it (heap overflow). Bound it. if actual_size > @buffer_size raise TransportError, "TCP recv header announces #{actual_size} bytes, exceeding the " \ "#{@buffer_size}-byte staging buffer (refusing to overflow)" end # Receive data in chunks bytes_received = 0 while bytes_received < actual_size chunk_size = [actual_size - bytes_received, @buffer_size].min data = @socket.recv(chunk_size) raise TransportError, "TCP recv failed" if data.nil? || data.empty? @recv_buffer.put_bytes(bytes_received, data) bytes_received += data.bytesize end # Copy from host buffer to GPU unstage_from_host(dst_ptr, actual_size, stream) actual_size end end |
#send(src_ptr, size, stream: nil) ⇒ Boolean
Send data to remote
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 103 def send(src_ptr, size, stream: nil) ensure_initialized! # Guard against overflowing the fixed-size host staging buffer: # stage_to_host copies `size` bytes into the @buffer_size buffer, so a # larger transfer corrupts the heap. Chunked staging for >buffer_size # transfers isn't implemented yet — fail loudly instead of overflowing. if size > @buffer_size raise TransportError, "TCP transfer of #{size} bytes exceeds the #{@buffer_size}-byte host staging " \ "buffer (chunked staging not implemented); increase buffer_size or chunk the transfer" end @mutex.synchronize do # Stage GPU data to host buffer stage_to_host(src_ptr, size, stream) # Send size header send_header(size) # Send data in chunks bytes_sent = 0 while bytes_sent < size chunk_size = [size - bytes_sent, @buffer_size].min sent = @socket.send(@send_buffer.get_bytes(bytes_sent, chunk_size), 0) raise TransportError, "TCP send failed" if sent <= 0 bytes_sent += sent end true end end |
#to_s ⇒ String
195 196 197 198 |
# File 'lib/nvruby/collective/transport/tcp_transport.rb', line 195 def to_s status = ready? ? "connected" : "disconnected" "TCPTransport[#{@local_addr}:#{@local_port} <-> #{@remote_addr}:#{@remote_port}, #{status}]" end |