Class: Ignis::Collective::Algorithms::Tree

Inherits:
Object
  • Object
show all
Defined in:
lib/nvruby/collective/algorithms/tree.rb

Overview

Tree-based algorithms for broadcast and reduce operations

Binary tree algorithms are optimal for small messages where latency dominates:

  • Latency complexity: O(log N) steps

  • Bandwidth complexity: O(data_size) per step (not optimal for large messages)

Best for: Small messages (<1KB) where latency is critical

Defined Under Namespace

Classes: TreeNode

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gpu_ids:, transport_selector:) ⇒ Tree

Returns a new instance of Tree.

Parameters:

  • gpu_ids (Array<Integer>)

    GPU device IDs

  • transport_selector (TransportSelector)

    Transport selector



31
32
33
34
35
36
# File 'lib/nvruby/collective/algorithms/tree.rb', line 31

def initialize(gpu_ids:, transport_selector:)
  @gpu_ids = gpu_ids.dup.freeze
  @n_gpus = gpu_ids.size
  @transport_selector = transport_selector
  @tree = build_tree
end

Instance Attribute Details

#gpu_idsArray<Integer> (readonly)

Returns GPU IDs.

Returns:

  • (Array<Integer>)

    GPU IDs



18
19
20
# File 'lib/nvruby/collective/algorithms/tree.rb', line 18

def gpu_ids
  @gpu_ids
end

#n_gpusInteger (readonly)

Returns Number of GPUs.

Returns:

  • (Integer)

    Number of GPUs



21
22
23
# File 'lib/nvruby/collective/algorithms/tree.rb', line 21

def n_gpus
  @n_gpus
end

#transport_selectorTransportSelector (readonly)

Returns Transport selector.

Returns:



24
25
26
# File 'lib/nvruby/collective/algorithms/tree.rb', line 24

def transport_selector
  @transport_selector
end

#treeArray<TreeNode> (readonly)

Returns Tree structure.

Returns:



27
28
29
# File 'lib/nvruby/collective/algorithms/tree.rb', line 27

def tree
  @tree
end

Instance Method Details

#broadcast(buffer:, buffers:, size:, root:, streams:) ⇒ void

This method returns an undefined value.

Broadcast data from root to all GPUs

Parameters:

  • buffer (FFI::Pointer)

    Source buffer on root GPU

  • buffers (Array<FFI::Pointer>)

    Destination buffers on all GPUs

  • size (Integer)

    Buffer size in bytes

  • root (Integer)

    Root rank (index in gpu_ids)

  • streams (Array<CUDA::Stream, FFI::Pointer>)

    CUDA streams



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/nvruby/collective/algorithms/tree.rb', line 46

def broadcast(buffer:, buffers:, size:, root:, streams:)
  return if @n_gpus == 1

  validate_root!(root)

  # Build tree rooted at specified root
  tree = build_tree(root: root)

  # Broadcast down the tree: log2(N) steps
  depth = tree_depth(tree)

  depth.times do |d|
    # At each depth level, nodes at depth d send to their children
    tree.each do |node|
      next unless node.depth == d
      next if node.children.empty?

      src_rank = node.rank
      src_gpu = node.gpu_id
      src_buffer = buffers[src_rank]
      stream_ptr = get_stream_ptr(streams[src_rank])

      node.children.each do |child_rank|
        dst_gpu = @gpu_ids[child_rank]
        dst_buffer = buffers[child_rank]

        transport = @transport_selector.select_transport(src_gpu, dst_gpu)
        move!(transport, dst_buffer, src_buffer, size, stream_ptr)
      end
    end

    # Synchronize after each level
    synchronize_all_streams!(streams)
  end
end

#reduce(buffers:, sizes:, dtype:, op:, root:, streams:) ⇒ void

This method returns an undefined value.

Reduce data from all GPUs to root

Parameters:

  • buffers (Array<FFI::Pointer>)

    Source buffers on all GPUs

  • sizes (Array<Integer>)

    Buffer sizes

  • dtype (Symbol)

    Data type

  • op (Symbol)

    Reduction operation

  • root (Integer)

    Root rank

  • streams (Array<CUDA::Stream, FFI::Pointer>)

    CUDA streams



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/nvruby/collective/algorithms/tree.rb', line 91

def reduce(buffers:, sizes:, dtype:, op:, root:, streams:)
  return if @n_gpus == 1

  validate_root!(root)

  size = sizes[0]
  elem_size = dtype_elem_size(dtype)
  elem_count = size / elem_size

  # Build tree rooted at root
  tree = build_tree(root: root)

  # Allocate temp buffers
  recv_buffers = allocate_recv_buffers(size)

  begin
    # Reduce up the tree: log2(N) steps
    depth = tree_depth(tree)

    # Start from leaves, work up to root
    (depth - 1).downto(0) do |d|
      tree.each do |node|
        next unless node.depth == d
        next if node.children.empty?

        dst_rank = node.rank
        dst_gpu = node.gpu_id
        dst_buffer = buffers[dst_rank]
        stream_ptr = get_stream_ptr(streams[dst_rank])

        # Set device for reduction
        CUDA::RuntimeAPI.cudaSetDevice(dst_gpu)

        # For each child: copy its buffer into recv, then reduce it into
        # the parent's local buffer BEFORE the next child overwrites recv.
        # (Previously every child was copied into the SAME recv buffer and
        # then reduced once per child, which double-counted the last child
        # and dropped the rest — e.g. parent += child2 + child2.)
        recv_buffer = recv_buffers[dst_rank]
        node.children.each do |child_rank|
          src_gpu = @gpu_ids[child_rank]
          src_buffer = buffers[child_rank]

          transport = @transport_selector.select_transport(src_gpu, dst_gpu)
          move!(transport, recv_buffer, src_buffer, size, stream_ptr)
          synchronize_stream!(streams[dst_rank])

          ReductionOps.execute(op, dst_buffer, recv_buffer, dst_buffer,
                               elem_count, dtype, stream_ptr)
          synchronize_stream!(streams[dst_rank])
        end
      end

      synchronize_all_streams!(streams)
    end
  ensure
    free_recv_buffers(recv_buffers)
  end
end