Class: Ignis::Collective::NetworkDirect::RDMATransport

Inherits:
Transport::Base show all
Defined in:
lib/nvruby/collective/net/rdma_transport.rb

Overview

RDMA Transport for multi-node GPU communication

Uses Windows NetworkDirect for zero-copy, kernel-bypass transfers. Integrates with CUDA for GPU memory registration.

Workflow:

  1. Discover RDMA adapters

  2. Create queue pairs and completion queues

  3. Register GPU memory for RDMA

  4. Connect to remote peers

  5. Perform RDMA Read/Write operations

Instance Attribute Summary

Attributes inherited from Transport::Base

#dst_device, #src_device

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Transport::Base

#ready?, #recv_async, #recv_sync, #send_async, #send_sync, #synchronize!, #to_s

Constructor Details

#initialize(local_address: nil, local_port: nil, **opts) ⇒ RDMATransport

Initialize the transport

Parameters:

  • local_address (String) (defaults to: nil)

    Local IP for RDMA bind

  • local_port (Integer) (defaults to: nil)

    Local port



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 40

def initialize(local_address: nil, local_port: nil, **opts)
  super(**opts)
  @local_address = local_address
  @local_port = local_port || 0
  @adapter = nil
  @send_cq = nil
  @recv_cq = nil
  @qp = nil
  @connector = nil
  @memory_regions = {}  # gpu_ptr -> MemoryRegion
end

Class Method Details

.available?Boolean

Check if RDMA is available

Returns:

  • (Boolean)

    True if RDMA hardware present



288
289
290
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 288

def self.available?
  Bindings.available?
end

.transport_typeSymbol

Returns Transport type identifier.

Returns:

  • (Symbol)

    Transport type identifier



23
24
25
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 23

def self.transport_type
  :rdma
end

Instance Method Details

#accept(private_data: nil) ⇒ void

This method returns an undefined value.

Accept incoming connection (server mode)

Parameters:

  • private_data (String, nil) (defaults to: nil)

    Response private data



113
114
115
116
117
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 113

def accept(private_data: nil)
  ensure_initialized!

  @connector.accept(qp: @qp, private_data: private_data)
end

#connect(remote_address:, remote_port:, private_data: nil) ⇒ void

This method returns an undefined value.

Connect to remote peer (client mode)

Parameters:

  • remote_address (String)

    Remote IP address

  • remote_port (Integer)

    Remote port

  • private_data (String, nil) (defaults to: nil)

    Connection private data



99
100
101
102
103
104
105
106
107
108
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 99

def connect(remote_address:, remote_port:, private_data: nil)
  ensure_initialized!

  @connector.connect(
    qp: @qp,
    remote_address: remote_address,
    remote_port: remote_port,
    private_data: private_data
  )
end

#deregister_gpu_memory(gpu_ptr) ⇒ void

This method returns an undefined value.

Deregister GPU memory

Parameters:

  • gpu_ptr (FFI::Pointer)

    GPU device pointer



148
149
150
151
152
153
154
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 148

def deregister_gpu_memory(gpu_ptr)
  info = @memory_regions.delete(gpu_ptr.address)
  return unless info

  info[:memory_region].deregister!
  free_pinned_host(info[:host_buffer])
end

#destroy!void

This method returns an undefined value.

Cleanup



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 294

def destroy!
  # Deregister all memory
  @memory_regions.each_key do |ptr_addr|
    ptr = FFI::Pointer.new(:uint8, ptr_addr)
    deregister_gpu_memory(ptr) rescue nil
  end

  @connector&.close!
  @qp&.close!
  @send_cq&.close!
  @recv_cq&.close!
  @adapter&.close!

  Bindings.NdCleanup rescue nil

  @initialized = false
end

#estimated_bandwidthFloat

Returns Estimated bandwidth (GB/s).

Returns:

  • (Float)

    Estimated bandwidth (GB/s)



28
29
30
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 28

def estimated_bandwidth
  100.0  # 100 Gbps Mellanox ConnectX-6
end

#estimated_latencyFloat

Returns Estimated latency (microseconds).

Returns:

  • (Float)

    Estimated latency (microseconds)



33
34
35
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 33

def estimated_latency
  1.5  # RDMA latency typically < 2us
end

#initialize!void

This method returns an undefined value.

Initialize RDMA resources



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 54

def initialize!
  return if @initialized

  Bindings.ensure_loaded!
  
  unless Bindings.available?
    raise RDMAError, "NetworkDirect not available: #{Bindings.load_error}"
  end

  # Start NetworkDirect
  status = Bindings.NdStartup(2)  # Version 2
  Bindings.check_status!(status, "NdStartup")

  # Open first available adapter
  @adapter = open_adapter

  # Create completion queues
  @send_cq = @adapter.create_completion_queue(depth: 256)
  @recv_cq = @adapter.create_completion_queue(depth: 256)

  # Create queue pair
  @qp = @adapter.create_queue_pair(
    send_cq: @send_cq,
    recv_cq: @recv_cq,
    send_depth: 64,
    recv_depth: 64,
    sge_count: 4
  )

  # Create connector
  @connector = @adapter.create_connector

  # Bind to local address
  if @local_address
    @connector.bind(address: @local_address, port: @local_port)
  end

  @initialized = true
end

#rdma_read(local_buffer:, remote_address:, remote_token:, size:) ⇒ void

This method returns an undefined value.

RDMA Read (one-sided, zero-copy from remote)

Parameters:

  • local_buffer (FFI::Pointer)

    Local GPU buffer

  • remote_address (Integer)

    Remote buffer address

  • remote_token (Integer)

    Remote memory token

  • size (Integer)

    Size in bytes

Raises:



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 258

def rdma_read(local_buffer:, remote_address:, remote_token:, size:)
  ensure_initialized!

  info = @memory_regions[local_buffer.address]
  raise RDMAError, "Buffer not registered" unless info

  # RDMA Read
  @qp.rdma_read(
    remote_address: remote_address,
    remote_token: remote_token,
    sge_list: [{
      buffer: info[:host_buffer],
      length: size,
      token: info[:memory_region].token
    }]
  )

  poll_completion(@send_cq)

  # Stage to GPU
  CUDA::RuntimeAPI.cudaMemcpy(
    local_buffer,
    info[:host_buffer],
    size,
    CUDA::RuntimeAPI::MEMCPY_HOST_TO_DEVICE
  )
end

#rdma_write(local_buffer:, remote_address:, remote_token:, size:) ⇒ void

This method returns an undefined value.

RDMA Write (one-sided, zero-copy to remote)

Parameters:

  • local_buffer (FFI::Pointer)

    Local GPU buffer

  • remote_address (Integer)

    Remote buffer address

  • remote_token (Integer)

    Remote memory token

  • size (Integer)

    Size in bytes

Raises:



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 224

def rdma_write(local_buffer:, remote_address:, remote_token:, size:)
  ensure_initialized!

  info = @memory_regions[local_buffer.address]
  raise RDMAError, "Buffer not registered" unless info

  # Stage to host
  CUDA::RuntimeAPI.cudaMemcpy(
    info[:host_buffer],
    local_buffer,
    size,
    CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_HOST
  )

  # RDMA Write
  @qp.rdma_write(
    remote_address: remote_address,
    remote_token: remote_token,
    sge_list: [{
      buffer: info[:host_buffer],
      length: size,
      token: info[:memory_region].token
    }]
  )

  poll_completion(@send_cq)
end

#receive(buffer, size) ⇒ void

This method returns an undefined value.

RDMA Receive (two-sided)

Parameters:

  • buffer (FFI::Pointer)

    Destination buffer (GPU)

  • size (Integer)

    Size in bytes

Raises:



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 191

def receive(buffer, size)
  ensure_initialized!

  info = @memory_regions[buffer.address]
  raise RDMAError, "Buffer not registered" unless info

  # Post RDMA receive
  @qp.post_receive(
    sge_list: [{
      buffer: info[:host_buffer],
      length: size,
      token: info[:memory_region].token
    }]
  )

  # Wait for completion
  poll_completion(@recv_cq)

  # Copy Host -> GPU
  CUDA::RuntimeAPI.cudaMemcpy(
    buffer,
    info[:host_buffer],
    size,
    CUDA::RuntimeAPI::MEMCPY_HOST_TO_DEVICE
  )
end

#register_gpu_memory(gpu_ptr, size) ⇒ Hash

Register GPU memory for RDMA

Parameters:

  • gpu_ptr (FFI::Pointer)

    GPU device pointer

  • size (Integer)

    Size in bytes

Returns:

  • (Hash)

    Remote access info :token, :size



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 123

def register_gpu_memory(gpu_ptr, size)
  ensure_initialized!

  # For GPU memory, we need to either:
  # 1. Use cuMemExportToShareableHandle for mapping
  # 2. Stage through pinned host memory

  # Option 2 is more portable: allocate pinned host buffer
  host_ptr = allocate_pinned_host(size)

  # Register the host buffer with RDMA
  mr = @adapter.register_memory(host_ptr, size)
  @memory_regions[gpu_ptr.address] = {
    memory_region: mr,
    host_buffer: host_ptr,
    size: size,
    gpu_ptr: gpu_ptr
  }

  mr.remote_access_info
end

#send(buffer, size) ⇒ void

This method returns an undefined value.

RDMA Send (two-sided)

Parameters:

  • buffer (FFI::Pointer)

    Source buffer (GPU)

  • size (Integer)

    Size in bytes

Raises:



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/nvruby/collective/net/rdma_transport.rb', line 160

def send(buffer, size)
  ensure_initialized!

  info = @memory_regions[buffer.address]
  raise RDMAError, "Buffer not registered" unless info

  # Copy GPU -> Host (staging)
  CUDA::RuntimeAPI.cudaMemcpy(
    info[:host_buffer],
    buffer,
    size,
    CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_HOST
  )

  # Post RDMA send
  @qp.post_send(
    sge_list: [{
      buffer: info[:host_buffer],
      length: size,
      token: info[:memory_region].token
    }]
  )

  # Wait for completion
  poll_completion(@send_cq)
end