Class: Ignis::Collective::Transport::IPCTransport
- Defined in:
- lib/nvruby/collective/transport/ipc_transport.rb
Overview
CUDA IPC transport for inter-process GPU memory sharing Uses cudaIpcGetMemHandle/cudaIpcOpenMemHandle for zero-copy cross-process access Supports both legacy IPC and cuMem VMM API
Constant Summary collapse
- BANDWIDTH_GBS =
Estimated bandwidth (host memory throughput for handle exchange)
25.0- LATENCY_US =
Estimated latency (IPC overhead)
10.0- EXCHANGE_METHODS =
Handle exchange methods
[:named_pipe, :shared_memory, :socket].freeze
Instance Attribute Summary collapse
-
#exchange_method ⇒ Symbol
readonly
Handle exchange method.
-
#exported_handle ⇒ P2PBindings::CudaIpcMemHandle?
readonly
Exported handle.
Attributes inherited from Base
Class Method Summary collapse
-
.available?(src, dst) ⇒ Boolean
IPC is always available on Windows for same-node communication.
-
.transport_type ⇒ Symbol
Transport type identifier.
Instance Method Summary collapse
-
#close_imported_handle(mapped_ptr) ⇒ void
Close an imported handle.
-
#destroy! ⇒ void
Clean up all handles.
-
#estimated_bandwidth ⇒ Float
Bandwidth in GB/s.
-
#estimated_latency ⇒ Float
Latency in microseconds.
-
#export_handle(device_ptr) ⇒ String
Export GPU memory for sharing with another process.
-
#import_handle(handle_bytes, flags: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS) ⇒ FFI::Pointer
Import GPU memory handle from another process.
-
#initialize(src_device:, dst_device:, exchange_method: :named_pipe) ⇒ IPCTransport
constructor
A new instance of IPCTransport.
-
#initialize! ⇒ void
Initialize IPC transport.
-
#recv_async(buffer, size, stream) ⇒ FFI::Pointer
Receive data using IPC (import phase).
-
#recv_with_handle(handle_bytes) ⇒ FFI::Pointer
Receive using provided handle.
-
#send_async(buffer, size, stream) ⇒ String
Send data using IPC (export phase) In IPC, send means making buffer available to receiver.
-
#to_s ⇒ String
Transport description.
Methods inherited from Base
#ready?, #recv_sync, #send_sync, #synchronize!
Constructor Details
#initialize(src_device:, dst_device:, exchange_method: :named_pipe) ⇒ IPCTransport
Returns a new instance of IPCTransport.
36 37 38 39 40 41 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 36 def initialize(src_device:, dst_device:, exchange_method: :named_pipe) super(src_device: src_device, dst_device: dst_device) @exchange_method = exchange_method @exported_handles = {} # device_ptr -> handle @imported_handles = {} # handle -> mapped_ptr end |
Instance Attribute Details
#exchange_method ⇒ Symbol (readonly)
Returns Handle exchange method.
31 32 33 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 31 def exchange_method @exchange_method end |
#exported_handle ⇒ P2PBindings::CudaIpcMemHandle? (readonly)
Returns Exported handle.
28 29 30 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 28 def exported_handle @exported_handle end |
Class Method Details
.available?(src, dst) ⇒ Boolean
IPC is always available on Windows for same-node communication
157 158 159 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 157 def self.available?(src, dst) true end |
.transport_type ⇒ Symbol
Returns Transport type identifier.
14 15 16 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 14 def self.transport_type :cuda_ipc end |
Instance Method Details
#close_imported_handle(mapped_ptr) ⇒ void
This method returns an undefined value.
Close an imported handle
106 107 108 109 110 111 112 113 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 106 def close_imported_handle(mapped_ptr) ensure_initialized! status = P2PBindings.cudaIpcCloseMemHandle(mapped_ptr) P2PBindings.check_status!(status, "Close IPC memory handle") @imported_handles.delete_if { |_, ptr| ptr == mapped_ptr } end |
#destroy! ⇒ void
This method returns an undefined value.
Clean up all handles
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 163 def destroy! @imported_handles.each_value do |ptr| P2PBindings.cudaIpcCloseMemHandle(ptr) rescue StandardError # Ignore cleanup errors end @imported_handles.clear @exported_handles.clear super end |
#estimated_bandwidth ⇒ Float
Returns Bandwidth in GB/s.
144 145 146 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 144 def estimated_bandwidth BANDWIDTH_GBS end |
#estimated_latency ⇒ Float
Returns Latency in microseconds.
149 150 151 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 149 def estimated_latency LATENCY_US end |
#export_handle(device_ptr) ⇒ String
Export GPU memory for sharing with another process
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 57 def export_handle(device_ptr) ensure_initialized! # Set source device context status = CUDA::RuntimeAPI.cudaSetDevice(@src_device) CUDA::RuntimeAPI.check_status!(status, "Set device for IPC export") handle = P2PBindings::CudaIpcMemHandle.new status = P2PBindings.cudaIpcGetMemHandle(handle, device_ptr) P2PBindings.check_status!(status, "Get IPC memory handle") # Cache the handle for cleanup handle_bytes = handle[:reserved].to_a.pack("C*") @exported_handles[device_ptr.address] = handle_bytes handle_bytes end |
#import_handle(handle_bytes, flags: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS) ⇒ FFI::Pointer
Import GPU memory handle from another process
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 79 def import_handle(handle_bytes, flags: P2PBindings::IPC_MEM_LAZY_ENABLE_PEER_ACCESS) ensure_initialized! # Set destination device context status = CUDA::RuntimeAPI.cudaSetDevice(@dst_device) CUDA::RuntimeAPI.check_status!(status, "Set device for IPC import") # Reconstruct handle from bytes handle = P2PBindings::CudaIpcMemHandle.new handle_bytes.each_byte.with_index do |byte, i| handle[:reserved][i] = byte end # Open the handle mapped_ptr_ptr = FFI::MemoryPointer.new(:pointer) status = P2PBindings.cudaIpcOpenMemHandle(mapped_ptr_ptr, handle, flags) P2PBindings.check_status!(status, "Open IPC memory handle") mapped_ptr = mapped_ptr_ptr.read_pointer @imported_handles[handle_bytes] = mapped_ptr mapped_ptr end |
#initialize! ⇒ void
This method returns an undefined value.
Initialize IPC transport
45 46 47 48 49 50 51 52 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 45 def initialize! return if @initialized P2PBindings.ensure_loaded! CUDA::RuntimeAPI.ensure_loaded! @initialized = true end |
#recv_async(buffer, size, stream) ⇒ FFI::Pointer
Receive data using IPC (import phase)
130 131 132 133 134 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 130 def recv_async(buffer, size, stream) # IPC recv requires handle from sender - actual implementation # would receive handle via exchange_method and then import raise TransportError, "IPC recv requires handle - use recv_with_handle" end |
#recv_with_handle(handle_bytes) ⇒ FFI::Pointer
Receive using provided handle
139 140 141 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 139 def recv_with_handle(handle_bytes) import_handle(handle_bytes) end |
#send_async(buffer, size, stream) ⇒ String
Send data using IPC (export phase) In IPC, send means making buffer available to receiver
121 122 123 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 121 def send_async(buffer, size, stream) export_handle(buffer) end |
#to_s ⇒ String
Returns Transport description.
175 176 177 |
# File 'lib/nvruby/collective/transport/ipc_transport.rb', line 175 def to_s "IPC[#{@src_device}→#{@dst_device}, #{@exchange_method}]" end |