Class: Ignis::Collective::Transport::P2PTransport
- Defined in:
- lib/nvruby/collective/transport/p2p_transport.rb
Overview
PCIe/NVLink peer-to-peer transport for direct GPU-to-GPU transfers Uses cudaMemcpyPeerAsync for high-bandwidth same-process communication
Constant Summary collapse
- BANDWIDTH_ESTIMATES =
Estimated bandwidth based on interconnect type
{ nvlink: 900.0, # GB/s - NVLink 4.0 pcie_p2p: 32.0, # GB/s - PCIe Gen4 x16 }.freeze
- LATENCY_ESTIMATES =
Estimated latency
{ nvlink: 1.0, # microseconds pcie_p2p: 5.0, # microseconds }.freeze
Instance Attribute Summary collapse
-
#interconnect_type ⇒ Symbol
readonly
Actual interconnect type (:nvlink or :pcie_p2p).
Attributes inherited from Base
Class Method Summary collapse
-
.available?(src, dst) ⇒ Boolean
Check if P2P is available between two GPUs.
-
.transport_type ⇒ Symbol
Transport type identifier.
Instance Method Summary collapse
-
#copy_async(dst_buffer, src_buffer, size, stream) ⇒ void
Receive data from source GPU asynchronously.
-
#destroy! ⇒ void
Clean up by disabling peer access.
-
#estimated_bandwidth ⇒ Float
Bandwidth in GB/s.
-
#estimated_latency ⇒ Float
Latency in microseconds.
-
#initialize(src_device:, dst_device:, interconnect_type: :pcie_p2p) ⇒ P2PTransport
constructor
A new instance of P2PTransport.
-
#initialize! ⇒ void
Initialize P2P transport by enabling peer access.
-
#recv_async(buffer, size, stream) ⇒ void
Alias for copy_async with reversed semantics.
-
#send_async(buffer, size, stream) ⇒ void
Send data to destination GPU asynchronously.
-
#to_s ⇒ String
Transport description.
Methods inherited from Base
#ready?, #recv_sync, #send_sync, #synchronize!
Constructor Details
#initialize(src_device:, dst_device:, interconnect_type: :pcie_p2p) ⇒ P2PTransport
Returns a new instance of P2PTransport.
35 36 37 38 39 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 35 def initialize(src_device:, dst_device:, interconnect_type: :pcie_p2p) super(src_device: src_device, dst_device: dst_device) @interconnect_type = interconnect_type @peer_access_enabled = false end |
Instance Attribute Details
#interconnect_type ⇒ Symbol (readonly)
Returns Actual interconnect type (:nvlink or :pcie_p2p).
30 31 32 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 30 def interconnect_type @interconnect_type end |
Class Method Details
.available?(src, dst) ⇒ Boolean
Check if P2P is available between two GPUs
124 125 126 127 128 129 130 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 124 def self.available?(src, dst) P2PBindings.ensure_loaded! can_access_ptr = FFI::MemoryPointer.new(:int) status = P2PBindings.cudaDeviceCanAccessPeer(can_access_ptr, src, dst) status.zero? && can_access_ptr.read_int == 1 end |
.transport_type ⇒ Symbol
Returns Transport type identifier.
13 14 15 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 13 def self.transport_type :pcie_p2p end |
Instance Method Details
#copy_async(dst_buffer, src_buffer, size, stream) ⇒ void
This method returns an undefined value.
Receive data from source GPU asynchronously
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 85 def copy_async(dst_buffer, src_buffer, size, stream) ensure_initialized! status = P2PBindings.cudaMemcpyPeerAsync( dst_buffer, @dst_device, src_buffer, @src_device, size, stream_ptr(stream) ) P2PBindings.check_status!(status, "P2P copy #{@src_device}→#{@dst_device}") end |
#destroy! ⇒ void
This method returns an undefined value.
Clean up by disabling peer access
134 135 136 137 138 139 140 141 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 134 def destroy! if @peer_access_enabled CUDA::RuntimeAPI.cudaSetDevice(@src_device) P2PBindings.cudaDeviceDisablePeerAccess(@dst_device) @peer_access_enabled = false end super end |
#estimated_bandwidth ⇒ Float
Returns Bandwidth in GB/s.
111 112 113 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 111 def estimated_bandwidth BANDWIDTH_ESTIMATES[@interconnect_type] || 12.0 end |
#estimated_latency ⇒ Float
Returns Latency in microseconds.
116 117 118 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 116 def estimated_latency LATENCY_ESTIMATES[@interconnect_type] || 20.0 end |
#initialize! ⇒ void
This method returns an undefined value.
Initialize P2P transport by enabling peer access
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 43 def initialize! return if @initialized P2PBindings.ensure_loaded! CUDA::RuntimeAPI.ensure_loaded! # Set source device context status = CUDA::RuntimeAPI.cudaSetDevice(@src_device) CUDA::RuntimeAPI.check_status!(status, "Set device #{@src_device}") # Enable peer access to destination status = P2PBindings.cudaDeviceEnablePeerAccess(@dst_device, 0) # Status 0 = success, 704 = already enabled (cudaErrorPeerAccessAlreadyEnabled) unless status.zero? || status == 704 P2PBindings.check_status!(status, "Enable peer access #{@src_device}→#{@dst_device}") end @peer_access_enabled = true @initialized = true end |
#recv_async(buffer, size, stream) ⇒ void
This method returns an undefined value.
Alias for copy_async with reversed semantics
104 105 106 107 108 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 104 def recv_async(buffer, size, stream) # Note: For full implementation, source buffer would come from send side # In ring allreduce, we manage buffers differently nil end |
#send_async(buffer, size, stream) ⇒ void
This method returns an undefined value.
Send data to destination GPU asynchronously
70 71 72 73 74 75 76 77 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 70 def send_async(buffer, size, stream) ensure_initialized! # For P2P, we use the receive buffer on dst, so this is a no-op # The actual transfer happens in recv_async with combined send buffer # This is for the half-duplex ring pattern nil end |
#to_s ⇒ String
Returns Transport description.
144 145 146 147 |
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 144 def to_s bw = estimated_bandwidth "P2P[#{@src_device}→#{@dst_device}, #{@interconnect_type}, #{bw} GB/s]" end |