Class: Ignis::Collective::Transport::RIOTransport
- Defined in:
- lib/nvruby/collective/transport/rio_transport.rb
Overview
Requires Windows 8+ and Winsock 2.2
Windows Registered I/O (RIO) Transport Zero-copy networking for multi-node GPU communication
RIO provides high-performance, low-latency networking on Windows by registering buffers once and avoiding kernel transitions.
Constant Summary collapse
- RIO_MSG_DONT_NOTIFY =
RIO constants from mswsock.h
0x01- RIO_MSG_DEFER =
0x02- RIO_MSG_WAITALL =
0x04- RIO_MSG_COMMIT_ONLY =
0x08- DEFAULT_BUFFER_SIZE =
Default buffer sizes
64 * 1024 * 1024
- DEFAULT_CQ_SIZE =
64 MB
4096- DEFAULT_RQ_SIZE =
1024
Instance Attribute Summary collapse
-
#local_addr ⇒ String
readonly
Local address.
-
#local_port ⇒ Integer
readonly
Local port.
-
#remote_addr ⇒ String
readonly
Remote address.
-
#remote_port ⇒ Integer
readonly
Remote port.
Attributes inherited from Base
Class Method Summary collapse
-
.transport_type ⇒ Symbol
Transport type.
Instance Method Summary collapse
-
#destroy! ⇒ void
Clean up resources.
-
#estimated_bandwidth ⇒ Float
Estimated bandwidth in GB/s.
-
#initialize(local_addr:, local_port:, remote_addr:, remote_port:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ RIOTransport
constructor
A new instance of RIOTransport.
-
#initialize! ⇒ void
Initialize RIO transport.
-
#ready? ⇒ Boolean
Check if ready.
-
#recv(dst_ptr, size, stream: nil) ⇒ Integer
Receive data from remote.
-
#send(src_ptr, size, stream: nil) ⇒ Boolean
Send data to remote.
- #to_s ⇒ String
Methods inherited from Base
available?, #estimated_latency, #recv_async, #recv_sync, #send_async, #send_sync, #synchronize!
Constructor Details
#initialize(local_addr:, local_port:, remote_addr:, remote_port:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ RIOTransport
Returns a new instance of RIOTransport.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 60 def initialize(local_addr:, local_port:, remote_addr:, remote_port:, buffer_size: DEFAULT_BUFFER_SIZE) super(src_device: 0, dst_device: 0) @local_addr = local_addr @local_port = local_port @remote_addr = remote_addr @remote_port = remote_port @buffer_size = buffer_size @socket = nil @rio_function_table = nil @send_cq = nil @recv_cq = nil @request_queue = nil @registered_buffers = {} @initialized = false end |
Instance Attribute Details
#local_addr ⇒ String (readonly)
Returns Local address.
39 40 41 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 39 def local_addr @local_addr end |
#local_port ⇒ Integer (readonly)
Returns Local port.
42 43 44 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 42 def local_port @local_port end |
#remote_addr ⇒ String (readonly)
Returns Remote address.
45 46 47 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 45 def remote_addr @remote_addr end |
#remote_port ⇒ Integer (readonly)
Returns Remote port.
48 49 50 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 48 def remote_port @remote_port end |
Class Method Details
.transport_type ⇒ Symbol
Returns Transport type.
51 52 53 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 51 def self.transport_type :rio_network end |
Instance Method Details
#destroy! ⇒ void
This method returns an undefined value.
Clean up resources
137 138 139 140 141 142 143 144 145 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 137 def destroy! return unless @initialized cleanup_buffers! cleanup_rio! close_socket! @initialized = false end |
#estimated_bandwidth ⇒ Float
Estimated bandwidth in GB/s
131 132 133 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 131 def estimated_bandwidth 12.5 # 100 Gbps network end |
#initialize! ⇒ void
This method returns an undefined value.
Initialize RIO transport
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 78 def initialize! return if @initialized load_rio_extension! create_socket! setup_rio_queues! register_buffers! @initialized = true end |
#ready? ⇒ Boolean
Check if ready
91 92 93 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 91 def ready? @initialized && @socket && !@socket.closed? end |
#recv(dst_ptr, size, stream: nil) ⇒ Integer
Receive data from remote
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 117 def recv(dst_ptr, size, stream: nil) ensure_initialized! # Submit RIO receive bytes_received = submit_recv(size) # Copy from host buffer to GPU unstage_from_host(dst_ptr, bytes_received, stream) bytes_received end |
#send(src_ptr, size, stream: nil) ⇒ Boolean
Send data to remote
101 102 103 104 105 106 107 108 109 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 101 def send(src_ptr, size, stream: nil) ensure_initialized! # Stage GPU data to registered host buffer if needed host_buffer = stage_to_host(src_ptr, size, stream) # Submit RIO send submit_send(host_buffer, size) end |
#to_s ⇒ String
148 149 150 |
# File 'lib/nvruby/collective/transport/rio_transport.rb', line 148 def to_s "RIOTransport[#{@local_addr}:#{@local_port} <-> #{@remote_addr}:#{@remote_port}]" end |