Class: Ignis::Collective::Transport::IPCTransport

Inherits:
Base
  • Object
show all
Defined in:
lib/nvruby/collective/transport/ipc_transport.rb

Overview

CUDA IPC transport for inter-process GPU memory sharing Uses cudaIpcGetMemHandle/cudaIpcOpenMemHandle for zero-copy cross-process access Supports both legacy IPC and cuMem VMM API

Constant Summary collapse

BANDWIDTH_GBS =

Estimated bandwidth (host memory throughput for handle exchange)

25.0
LATENCY_US =

Estimated latency (IPC overhead)

10.0
EXCHANGE_METHODS =

Handle exchange methods

[:named_pipe, :shared_memory, :socket].freeze

Instance Attribute Summary collapse

Attributes inherited from Base

#dst_device, #src_device

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#ready?, #recv_sync, #send_sync, #synchronize!

Constructor Details

#initialize(src_device:, dst_device:, exchange_method: :named_pipe) ⇒ IPCTransport

Returns a new instance of IPCTransport.

Parameters:

  • src_device (Integer)

    Source GPU

  • dst_device (Integer)

    Destination GPU

  • exchange_method (Symbol) (defaults to: :named_pipe)

    Method for handle exchange



36
37
38
39
40
41
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 36

def initialize(src_device:, dst_device:, exchange_method: :named_pipe)
  super(src_device: src_device, dst_device: dst_device)
  @exchange_method = exchange_method
  @exported_handles = {}  # device_ptr -> handle
  @imported_handles = {}  # handle -> mapped_ptr
end

Instance Attribute Details

#exchange_methodSymbol (readonly)

Returns Handle exchange method.

Returns:

  • (Symbol)

    Handle exchange method



31
32
33
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 31

def exchange_method
  @exchange_method
end

#exported_handleP2PBindings::CudaIpcMemHandle? (readonly)

Returns Exported handle.

Returns:



28
29
30
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 28

def exported_handle
  @exported_handle
end

Class Method Details

.available?(src, dst) ⇒ Boolean

IPC is always available on Windows for same-node communication

Parameters:

  • src (Integer)

    Source GPU

  • dst (Integer)

    Destination GPU

Returns:

  • (Boolean)

    True (always available)



157
158
159
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 157

def self.available?(src, dst)
  true
end

.transport_typeSymbol

Returns Transport type identifier.

Returns:

  • (Symbol)

    Transport type identifier



14
15
16
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 14

def self.transport_type
  :cuda_ipc
end

Instance Method Details

#close_imported_handle(mapped_ptr) ⇒ void

This method returns an undefined value.

Close an imported handle

Parameters:

  • mapped_ptr (FFI::Pointer)

    Previously imported pointer



106
107
108
109
110
111
112
113
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 106

def close_imported_handle(mapped_ptr)
  ensure_initialized!

  status = P2PBindings.cudaIpcCloseMemHandle(mapped_ptr)
  P2PBindings.check_status!(status, "Close IPC memory handle")

  @imported_handles.delete_if { |_, ptr| ptr == mapped_ptr }
end

#destroy!void

This method returns an undefined value.

Clean up all handles



163
164
165
166
167
168
169
170
171
172
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 163

def destroy!
  @imported_handles.each_value do |ptr|
    P2PBindings.cudaIpcCloseMemHandle(ptr)
  rescue StandardError
    # Ignore cleanup errors
  end
  @imported_handles.clear
  @exported_handles.clear
  super
end

#estimated_bandwidthFloat

Returns Bandwidth in GB/s.

Returns:

  • (Float)

    Bandwidth in GB/s



144
145
146
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 144

def estimated_bandwidth
  BANDWIDTH_GBS
end

#estimated_latencyFloat

Returns Latency in microseconds.

Returns:

  • (Float)

    Latency in microseconds



149
150
151
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 149

def estimated_latency
  LATENCY_US
end

#export_handle(device_ptr) ⇒ String

Export GPU memory for sharing with another process

Parameters:

  • device_ptr (FFI::Pointer)

    GPU memory pointer to export

Returns:

  • (String)

    Binary handle (64 bytes) for transfer to other process



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 57

def export_handle(device_ptr)
  ensure_initialized!

  # Set source device context
  status = CUDA::RuntimeAPI.cudaSetDevice(@src_device)
  CUDA::RuntimeAPI.check_status!(status, "Set device for IPC export")

  handle = P2PBindings::CudaIpcMemHandle.new
  status = P2PBindings.cudaIpcGetMemHandle(handle, device_ptr)
  P2PBindings.check_status!(status, "Get IPC memory handle")

  # Cache the handle for cleanup
  handle_bytes = handle[:reserved].to_a.pack("C*")
  @exported_handles[device_ptr.address] = handle_bytes

  handle_bytes
end

#import_handle(handle_bytes, flags: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS) ⇒ FFI::Pointer

Import GPU memory handle from another process

Parameters:

  • handle_bytes (String)

    64-byte binary handle

  • flags (Integer) (defaults to: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS)

    IPC memory flags

Returns:

  • (FFI::Pointer)

    Mapped device pointer usable in this process



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 79

def import_handle(handle_bytes, flags: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS)
  ensure_initialized!

  # Set destination device context
  status = CUDA::RuntimeAPI.cudaSetDevice(@dst_device)
  CUDA::RuntimeAPI.check_status!(status, "Set device for IPC import")

  # Reconstruct handle from bytes
  handle = P2PBindings::CudaIpcMemHandle.new
  handle_bytes.each_byte.with_index do |byte, i|
    handle[:reserved][i] = byte
  end

  # Open the handle
  mapped_ptr_ptr = FFI::MemoryPointer.new(:pointer)
  status = P2PBindings.cudaIpcOpenMemHandle(mapped_ptr_ptr, handle, flags)
  P2PBindings.check_status!(status, "Open IPC memory handle")

  mapped_ptr = mapped_ptr_ptr.read_pointer
  @imported_handles[handle_bytes] = mapped_ptr

  mapped_ptr
end

#initialize!void

This method returns an undefined value.

Initialize IPC transport



45
46
47
48
49
50
51
52
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 45

def initialize!
  return if @initialized

  P2PBindings.ensure_loaded!
  CUDA::RuntimeAPI.ensure_loaded!

  @initialized = true
end

#recv_async(buffer, size, stream) ⇒ FFI::Pointer

Receive data using IPC (import phase)

Parameters:

  • buffer (FFI::Pointer)

    NOT USED - returns new mapped pointer

  • size (Integer)

    Expected size

  • stream (Object)

    CUDA stream

Returns:

  • (FFI::Pointer)

    Mapped device pointer

Raises:



130
131
132
133
134
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 130

def recv_async(buffer, size, stream)
  # IPC recv requires handle from sender - actual implementation
  # would receive handle via exchange_method and then import
  raise TransportError, "IPC recv requires handle - use recv_with_handle"
end

#recv_with_handle(handle_bytes) ⇒ FFI::Pointer

Receive using provided handle

Parameters:

  • handle_bytes (String)

    Handle from sender

Returns:

  • (FFI::Pointer)

    Mapped device pointer



139
140
141
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 139

def recv_with_handle(handle_bytes)
  import_handle(handle_bytes)
end

#send_async(buffer, size, stream) ⇒ String

Send data using IPC (export phase) In IPC, send means making buffer available to receiver

Parameters:

  • buffer (FFI::Pointer)

    Buffer to share

  • size (Integer)

    Buffer size (for validation)

  • stream (Object)

    CUDA stream (unused for export, sync only)

Returns:

  • (String)

    Handle bytes to transmit to receiver



121
122
123
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 121

def send_async(buffer, size, stream)
  export_handle(buffer)
end

#to_sString

Returns Transport description.

Returns:

  • (String)

    Transport description



175
176
177
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 175

def to_s
  "IPC[#{@src_device}#{@dst_device}, #{@exchange_method}]"
end