Class: Ignis::Collective::Transport::RIOTransport

Inherits:
Base
  • Object
show all
Defined in:
lib/nvruby/collective/transport/rio_transport.rb

Overview

Note:

Requires Windows 8+ and Winsock 2.2

Windows Registered I/O (RIO) Transport Zero-copy networking for multi-node GPU communication

RIO provides high-performance, low-latency networking on Windows by registering buffers once and avoiding kernel transitions.

Examples:

Create RIO transport

transport = RIOTransport.new(
  local_addr: "192.168.1.100",
  local_port: 50000,
  remote_addr: "192.168.1.101",
  remote_port: 50000
)
transport.initialize!
transport.send(gpu_buffer, size)

Constant Summary collapse

RIO_MSG_DONT_NOTIFY =

RIO constants from mswsock.h

0x01
RIO_MSG_DEFER =
0x02
RIO_MSG_WAITALL =
0x04
RIO_MSG_COMMIT_ONLY =
0x08
DEFAULT_BUFFER_SIZE =

Default buffer sizes

64 * 1024 * 1024
DEFAULT_CQ_SIZE =

64 MB

4096
DEFAULT_RQ_SIZE =
1024

Instance Attribute Summary collapse

Attributes inherited from Base

#dst_device, #src_device

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

available?, #estimated_latency, #recv_async, #recv_sync, #send_async, #send_sync, #synchronize!

Constructor Details

#initialize(local_addr:, local_port:, remote_addr:, remote_port:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ RIOTransport

Returns a new instance of RIOTransport.

Parameters:

  • local_addr (String)

    Local IP address

  • local_port (Integer)

    Local port

  • remote_addr (String)

    Remote IP address

  • remote_port (Integer)

    Remote port

  • buffer_size (Integer) (defaults to: DEFAULT_BUFFER_SIZE)

    Registered buffer size



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 60

def initialize(local_addr:, local_port:, remote_addr:, remote_port:, buffer_size: DEFAULT_BUFFER_SIZE)
  super(src_device: 0, dst_device: 0)
  @local_addr = local_addr
  @local_port = local_port
  @remote_addr = remote_addr
  @remote_port = remote_port
  @buffer_size = buffer_size
  @socket = nil
  @rio_function_table = nil
  @send_cq = nil
  @recv_cq = nil
  @request_queue = nil
  @registered_buffers = {}
  @initialized = false
end

Instance Attribute Details

#local_addrString (readonly)

Returns Local address.

Returns:

  • (String)

    Local address



39
40
41
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 39

def local_addr
  @local_addr
end

#local_portInteger (readonly)

Returns Local port.

Returns:

  • (Integer)

    Local port



42
43
44
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 42

def local_port
  @local_port
end

#remote_addrString (readonly)

Returns Remote address.

Returns:

  • (String)

    Remote address



45
46
47
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 45

def remote_addr
  @remote_addr
end

#remote_portInteger (readonly)

Returns Remote port.

Returns:

  • (Integer)

    Remote port



48
49
50
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 48

def remote_port
  @remote_port
end

Class Method Details

.transport_typeSymbol

Returns Transport type.

Returns:

  • (Symbol)

    Transport type



51
52
53
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 51

def self.transport_type
  :rio_network
end

Instance Method Details

#destroy!void

This method returns an undefined value.

Clean up resources



137
138
139
140
141
142
143
144
145
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 137

def destroy!
  return unless @initialized

  cleanup_buffers!
  cleanup_rio!
  close_socket!

  @initialized = false
end

#estimated_bandwidthFloat

Estimated bandwidth in GB/s

Returns:

  • (Float)


131
132
133
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 131

def estimated_bandwidth
  12.5  # 100 Gbps network
end

#initialize!void

This method returns an undefined value.

Initialize RIO transport



78
79
80
81
82
83
84
85
86
87
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 78

def initialize!
  return if @initialized

  load_rio_extension!
  create_socket!
  setup_rio_queues!
  register_buffers!

  @initialized = true
end

#ready?Boolean

Check if ready

Returns:

  • (Boolean)


91
92
93
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 91

def ready?
  @initialized && @socket && !@socket.closed?
end

#recv(dst_ptr, size, stream: nil) ⇒ Integer

Receive data from remote

Parameters:

  • dst_ptr (FFI::Pointer)

    Destination buffer pointer

  • size (Integer)

    Size in bytes

  • stream (FFI::Pointer, nil) (defaults to: nil)

    CUDA stream (for unstaging)

Returns:

  • (Integer)

    Bytes received



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 117

def recv(dst_ptr, size, stream: nil)
  ensure_initialized!

  # Submit RIO receive
  bytes_received = submit_recv(size)

  # Copy from host buffer to GPU
  unstage_from_host(dst_ptr, bytes_received, stream)

  bytes_received
end

#send(src_ptr, size, stream: nil) ⇒ Boolean

Send data to remote

Parameters:

  • src_ptr (FFI::Pointer)

    Source buffer pointer

  • size (Integer)

    Size in bytes

  • stream (FFI::Pointer, nil) (defaults to: nil)

    CUDA stream (for staging)

Returns:

  • (Boolean)

    Success



101
102
103
104
105
106
107
108
109
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 101

def send(src_ptr, size, stream: nil)
  ensure_initialized!

  # Stage GPU data to registered host buffer if needed
  host_buffer = stage_to_host(src_ptr, size, stream)

  # Submit RIO send
  submit_send(host_buffer, size)
end

#to_sString

Returns:

  • (String)


148
149
150
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 148

def to_s
  "RIOTransport[#{@local_addr}:#{@local_port} <-> #{@remote_addr}:#{@remote_port}]"
end