Class: Ignis::Collective::Transport::P2PTransport

Inherits:
Base
  • Object
show all
Defined in:
lib/nvruby/collective/transport/p2p_transport.rb

Overview

PCIe/NVLink peer-to-peer transport for direct GPU-to-GPU transfers Uses cudaMemcpyPeerAsync for high-bandwidth same-process communication

Constant Summary collapse

BANDWIDTH_ESTIMATES =

Estimated bandwidth based on interconnect type

{
  nvlink: 900.0,    # GB/s - NVLink 4.0
  pcie_p2p: 32.0,   # GB/s - PCIe Gen4 x16
}.freeze
LATENCY_ESTIMATES =

Estimated latency

{
  nvlink: 1.0,      # microseconds
  pcie_p2p: 5.0,    # microseconds
}.freeze

Instance Attribute Summary collapse

Attributes inherited from Base

#dst_device, #src_device

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#ready?, #recv_sync, #send_sync, #synchronize!

Constructor Details

#initialize(src_device:, dst_device:, interconnect_type: :pcie_p2p) ⇒ P2PTransport

Returns a new instance of P2PTransport.

Parameters:

  • src_device (Integer)

    Source GPU

  • dst_device (Integer)

    Destination GPU

  • interconnect_type (Symbol) (defaults to: :pcie_p2p)

    Detected interconnect type



35
36
37
38
39
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 35

def initialize(src_device:, dst_device:, interconnect_type: :pcie_p2p)
  super(src_device: src_device, dst_device: dst_device)
  @interconnect_type = interconnect_type
  @peer_access_enabled = false
end

Instance Attribute Details

#interconnect_typeSymbol (readonly)

Returns Actual interconnect type (:nvlink or :pcie_p2p).

Returns:

  • (Symbol)

    Actual interconnect type (:nvlink or :pcie_p2p)



30
31
32
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 30

def interconnect_type
  @interconnect_type
end

Class Method Details

.available?(src, dst) ⇒ Boolean

Check if P2P is available between two GPUs

Parameters:

  • src (Integer)

    Source GPU

  • dst (Integer)

    Destination GPU

Returns:

  • (Boolean)

    True if P2P available



124
125
126
127
128
129
130
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 124

def self.available?(src, dst)
  P2PBindings.ensure_loaded!

  can_access_ptr = FFI::MemoryPointer.new(:int)
  status = P2PBindings.cudaDeviceCanAccessPeer(can_access_ptr, src, dst)
  status.zero? && can_access_ptr.read_int == 1
end

.transport_typeSymbol

Returns Transport type identifier.

Returns:

  • (Symbol)

    Transport type identifier



13
14
15
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 13

def self.transport_type
  :pcie_p2p
end

Instance Method Details

#copy_async(dst_buffer, src_buffer, size, stream) ⇒ void

This method returns an undefined value.

Receive data from source GPU asynchronously

Parameters:

  • dst_buffer (FFI::Pointer)

    Destination buffer on dst_device

  • src_buffer (FFI::Pointer)

    Source buffer on src_device

  • size (Integer)

    Bytes to receive

  • stream (CUDA::Stream, FFI::Pointer)

    CUDA stream



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 85

def copy_async(dst_buffer, src_buffer, size, stream)
  ensure_initialized!

  status = P2PBindings.cudaMemcpyPeerAsync(
    dst_buffer,
    @dst_device,
    src_buffer,
    @src_device,
    size,
    stream_ptr(stream)
  )
  P2PBindings.check_status!(status, "P2P copy #{@src_device}#{@dst_device}")
end

#destroy!void

This method returns an undefined value.

Clean up by disabling peer access



134
135
136
137
138
139
140
141
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 134

def destroy!
  if @peer_access_enabled
    CUDA::RuntimeAPI.cudaSetDevice(@src_device)
    P2PBindings.cudaDeviceDisablePeerAccess(@dst_device)
    @peer_access_enabled = false
  end
  super
end

#estimated_bandwidthFloat

Returns Bandwidth in GB/s.

Returns:

  • (Float)

    Bandwidth in GB/s



111
112
113
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 111

def estimated_bandwidth
  BANDWIDTH_ESTIMATES[@interconnect_type] || 12.0
end

#estimated_latencyFloat

Returns Latency in microseconds.

Returns:

  • (Float)

    Latency in microseconds



116
117
118
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 116

def estimated_latency
  LATENCY_ESTIMATES[@interconnect_type] || 20.0
end

#initialize!void

This method returns an undefined value.

Initialize P2P transport by enabling peer access



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 43

def initialize!
  return if @initialized

  P2PBindings.ensure_loaded!
  CUDA::RuntimeAPI.ensure_loaded!

  # Set source device context
  status = CUDA::RuntimeAPI.cudaSetDevice(@src_device)
  CUDA::RuntimeAPI.check_status!(status, "Set device #{@src_device}")

  # Enable peer access to destination
  status = P2PBindings.cudaDeviceEnablePeerAccess(@dst_device, 0)

  # Status 0 = success, 704 = already enabled (cudaErrorPeerAccessAlreadyEnabled)
  unless status.zero? || status == 704
    P2PBindings.check_status!(status, "Enable peer access #{@src_device}#{@dst_device}")
  end

  @peer_access_enabled = true
  @initialized = true
end

#recv_async(buffer, size, stream) ⇒ void

This method returns an undefined value.

Alias for copy_async with reversed semantics

Parameters:

  • buffer (FFI::Pointer)

    Destination buffer

  • size (Integer)

    Bytes to receive

  • stream (CUDA::Stream, FFI::Pointer)

    CUDA stream



104
105
106
107
108
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 104

def recv_async(buffer, size, stream)
  # Note: For full implementation, source buffer would come from send side
  # In ring allreduce, we manage buffers differently
  nil
end

#send_async(buffer, size, stream) ⇒ void

This method returns an undefined value.

Send data to destination GPU asynchronously

Parameters:

  • buffer (FFI::Pointer)

    Source buffer on src_device

  • size (Integer)

    Bytes to send

  • stream (CUDA::Stream, FFI::Pointer)

    CUDA stream



70
71
72
73
74
75
76
77
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 70

def send_async(buffer, size, stream)
  ensure_initialized!

  # For P2P, we use the receive buffer on dst, so this is a no-op
  # The actual transfer happens in recv_async with combined send buffer
  # This is for the half-duplex ring pattern
  nil
end

#to_sString

Returns Transport description.

Returns:

  • (String)

    Transport description



144
145
146
147
# File 'lib/nvruby/collective/transport/p2p_transport.rb', line 144

def to_s
  bw = estimated_bandwidth
  "P2P[#{@src_device}#{@dst_device}, #{@interconnect_type}, #{bw} GB/s]"
end