Initial release: NCCL Mesh Plugin for direct-connect RDMA topologies

- Enables NCCL over multi-subnet mesh topologies
- 8+ GB/s bandwidth over 100Gbps RDMA
- Successfully tested with distributed LLM inference (Mistral-7B)
- Custom subnet-aware NIC selection
- Background handshake thread for deadlock-free connection setup
This commit is contained in:
autoscriptlabs 2026-01-09 14:09:33 -05:00
commit 031bc48953
13 changed files with 3074 additions and 0 deletions

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

56
Makefile Normal file
View file

@ -0,0 +1,56 @@
# NCCL Mesh Plugin Makefile
CC = gcc
CFLAGS = -Wall -Wextra -O2 -fPIC -g
CFLAGS += -I. -I./include
LDFLAGS = -shared -libverbs -lpthread
# Target
TARGET = libnccl-net.so
TARGET_MESH = libnccl-net-mesh.so
# Sources
SRCS = src/mesh_plugin.c
OBJS = $(SRCS:.c=.o)
# Default target
all: $(TARGET) $(TARGET_MESH)
$(TARGET): $(OBJS)
$(CC) $(OBJS) -o $@ $(LDFLAGS)
$(TARGET_MESH): $(TARGET)
ln -sf $(TARGET) $(TARGET_MESH)
%.o: %.c
$(CC) $(CFLAGS) -c $< -o $@
# Install to a standard location
PREFIX ?= /usr/local
install: all
install -d $(PREFIX)/lib
install -m 755 $(TARGET) $(PREFIX)/lib/
ln -sf $(TARGET) $(PREFIX)/lib/$(TARGET_MESH)
# Clean
clean:
rm -f $(OBJS) $(TARGET) $(TARGET_MESH)
# Test build (requires libibverbs-dev)
test-deps:
@echo "Checking dependencies..."
@pkg-config --exists libibverbs || (echo "ERROR: libibverbs-dev not found" && exit 1)
@echo "All dependencies found."
# Debug build
debug: CFLAGS += -DDEBUG -g3 -O0
debug: clean all
# Print configuration
info:
@echo "CC = $(CC)"
@echo "CFLAGS = $(CFLAGS)"
@echo "LDFLAGS = $(LDFLAGS)"
@echo "TARGET = $(TARGET)"
.PHONY: all clean install test-deps debug info

244
README.md Normal file
View file

@ -0,0 +1,244 @@
# NCCL Mesh Plugin
**Custom NCCL network plugin enabling distributed ML over direct-connect RDMA mesh topologies.**
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## 🎯 What This Does
This plugin enables NCCL (NVIDIA Collective Communications Library) to work with **direct-connect mesh topologies** where each node pair is on a different subnet. Standard NCCL plugins assume either:
- A switched InfiniBand fabric (all nodes on same subnet)
- TCP/IP networking (slow, high latency)
Neither works for direct-cabled RDMA meshes. This plugin does.
## 🔧 The Problem We Solved
```
┌─────────────┐
│ Spark-A │
│ (titanic) │
└──────┬──────┘
192.168.101.x │ 192.168.100.x
(100Gbps) │ (100Gbps)
┌──────┴──────┐
│ │
┌─────┴─────┐ ┌─────┴─────┐
│ Spark-B │ │ Spark-C │
│ (iceberg) │ │(carpathia)│
└─────┬─────┘ └─────┬─────┘
│ │
└──────┬──────┘
192.168.102.x
(100Gbps)
```
**Three DGX Spark workstations** connected in a triangle mesh with direct 100Gbps RDMA cables. Each link is on a **different subnet** - a configuration NVIDIA never intended to support.
## 🚀 Results
| Metric | Value |
|--------|-------|
| Effective Bandwidth | **8+ GB/s** |
| Line Rate Utilization | ~64% |
| Topology | 3-node triangle mesh |
| Link Speed | 100 Gbps per link |
Successfully ran **distributed LLM inference** (Mistral-7B) across all 3 nodes using NCCL over this custom topology.
## 🏗️ Architecture
### Key Innovations
1. **Multi-Address Handle Exchange**
- Each node advertises ALL its subnet IPs in the NCCL handle
- Connector searches for reachable addresses by subnet matching
2. **Subnet-Aware NIC Selection**
- `connect()` finds the local NIC on the same subnet as the peer
- Automatic routing without IP forwarding or bridges
3. **Background Handshake Thread**
- Eliminates deadlock when both ranks call `connect()` simultaneously
- TCP-based QP info exchange runs asynchronously
4. **Bidirectional QP Exchange**
- Each connection creates fresh Queue Pairs on both sides
- No QP reuse across multiple NCCL channels
### RDMA Implementation
- Raw InfiniBand Verbs API (libibverbs)
- Reliable Connected (RC) Queue Pairs
- RoCE v2 over Ethernet
- Host memory staging (GPU→Host→RDMA→Host→GPU)
## 📦 Installation
### Prerequisites
```bash
# Ubuntu/Debian
sudo apt-get install libibverbs-dev librdmacm-dev
# Verify RDMA devices
ibv_devices
```
### Build
```bash
git clone https://github.com/yourusername/nccl-mesh-plugin.git
cd nccl-mesh-plugin
make
```
### Use
```bash
export LD_LIBRARY_PATH=$(pwd):$LD_LIBRARY_PATH
export NCCL_NET_PLUGIN=mesh
export NCCL_DEBUG=INFO # or WARN for less output
# Run your distributed job
python your_distributed_script.py
```
## 🧪 Testing
### Basic All-Reduce Test
```python
import torch
import torch.distributed as dist
dist.init_process_group('nccl', rank=RANK, world_size=3,
init_method='tcp://MASTER_IP:29500')
t = torch.ones(1000, device='cuda')
dist.all_reduce(t)
print(f'Result: {t[0]}') # Should print 3.0
dist.destroy_process_group()
```
### Bandwidth Benchmark
```python
import torch
import torch.distributed as dist
import time
dist.init_process_group('nccl', rank=RANK, world_size=3,
init_method='tcp://MASTER_IP:29500')
t = torch.ones(1024*1024*64, device='cuda') # 256MB
# Warmup
for _ in range(5):
dist.all_reduce(t)
torch.cuda.synchronize()
# Benchmark
start = time.time()
for _ in range(20):
dist.all_reduce(t)
torch.cuda.synchronize()
elapsed = time.time() - start
print(f'Bandwidth: {(256*20/1024)/elapsed:.2f} GB/s')
```
## 🔬 How It Works
### Connection Flow
```
Rank 0 (listen) Rank 1 (connect)
│ │
▼ │
listen() │
├─ Create QPs on ALL NICs │
├─ Start handshake thread │
├─ Return handle with all IPs │
│ │
│◄──────── handle exchange ────────►│
│ │
│ ▼
│ connect()
│ ├─ Find matching subnet
│ ├─ Create QP on that NIC
│ ├─ TCP handshake ──────────►│
│ │ │
│◄────────────────────────────────────────── QP info ─────┤
│ │ │
▼ ▼ ▼
accept() Connect QP [handshake thread]
├─ Get QP from queue to peer's QP ├─ Accept TCP
└─ Return recv_comm │ ├─ Create new QP
│ ├─ Connect QPs
│ └─ Queue for accept()
┌────┴────┐
│ RDMA OK │
└─────────┘
```
### Subnet Matching
```c
// For each peer address in handle
for (int i = 0; i < handle->num_addrs; i++) {
uint32_t peer_ip = handle->addrs[i].ip;
// Find local NIC on same subnet
for (int j = 0; j < num_nics; j++) {
if ((peer_ip & nic[j].netmask) == nic[j].subnet) {
// Found matching NIC!
selected_nic = &nic[j];
break;
}
}
}
```
## ⚙️ Configuration
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `NCCL_NET_PLUGIN` | - | Set to `mesh` to use this plugin |
| `NCCL_DEBUG` | `WARN` | Set to `INFO` for detailed logs |
| `NCCL_MESH_GID_INDEX` | `3` | RoCE GID index to use |
| `NCCL_MESH_DEBUG` | `0` | Enable plugin debug output |
## 🚧 Limitations
- **Host memory staging**: GPU memory goes through host (no GPUDirect RDMA yet)
- **Single QP per connection**: No multi-rail aggregation
- **No relay routing**: Non-adjacent nodes can't communicate (fine for fully-connected mesh)
- **RoCE v2 only**: No InfiniBand support (Ethernet only)
## 🗺️ Roadmap
- [ ] GPUDirect RDMA support (bypass host memory)
- [ ] Multi-QP per connection for higher bandwidth
- [ ] Adaptive routing for partial meshes
- [ ] Performance tuning (inline data, signaling)
## 📚 References
- [NCCL Documentation](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/)
- [RDMA Aware Networks Programming User Manual](https://www.mellanox.com/related-docs/prod_software/RDMA_Aware_Programming_user_manual.pdf)
- [InfiniBand Verbs API](https://github.com/linux-rdma/rdma-core)
## 📄 License
MIT License - see [LICENSE](LICENSE) file.
## 🙏 Acknowledgments
Built to connect three DGX Spark workstations that NVIDIA never intended to be clustered. Sometimes the best solutions come from ignoring "supported configurations."
---
*"The future of distributed AI computing is here."* - Mistral-7B, running on this very plugin

337
docs/ARCHITECTURE.md Normal file
View file

@ -0,0 +1,337 @@
# NCCL Mesh Plugin Architecture
This document provides a deep dive into the architecture and implementation of the NCCL Mesh Plugin.
## Overview
The NCCL Mesh Plugin is a custom network transport that enables NCCL to work with direct-connect RDMA mesh topologies where each node pair is on a different subnet. This is a configuration that standard NCCL plugins cannot handle.
## The Problem
### Standard NCCL Networking
NCCL's built-in network plugins assume one of two scenarios:
1. **InfiniBand Fabric**: All nodes connected through IB switches, sharing a single subnet
2. **TCP/IP Sockets**: Standard IP networking with routing
### Our Topology
```
Node A (192.168.100.2, 192.168.101.2)
/ \
192.168.100.x 192.168.101.x
/ \
Node C Node B
(192.168.100.3, (192.168.101.3,
192.168.102.3) 192.168.102.2)
\ /
\ 192.168.102.x /
\ /
\--------------/
```
Each link is on a **different subnet**:
- A↔B: 192.168.101.0/24
- A↔C: 192.168.100.0/24
- B↔C: 192.168.102.0/24
This means:
- No single IP can reach all peers
- Standard IB plugin fails (expects single subnet)
- TCP socket plugin would need IP routing (adds latency)
## Solution Architecture
### Key Insight
Each node has **multiple NICs**, each on a different subnet. When connecting to a peer, we must:
1. Determine which subnet the peer is on
2. Use the local NIC on that same subnet
3. Establish RDMA connection over that specific NIC pair
### Handle Structure
The NCCL handle is expanded to advertise **all** local addresses:
```c
struct mesh_handle {
uint32_t magic; // Validation
uint8_t num_addrs; // Number of addresses
uint16_t handshake_port; // TCP port for QP exchange
struct mesh_addr_entry {
uint32_t ip; // IP address (network order)
uint32_t mask; // Subnet mask
uint32_t qp_num; // Queue Pair number
uint8_t nic_idx; // Index into local NIC array
} addrs[MESH_MAX_ADDRS];
};
```
### Connection Flow
#### Phase 1: Listen
```c
ncclResult_t mesh_listen(int dev, void *handle, void **listenComm) {
// 1. Create QPs on ALL local NICs
for (int i = 0; i < num_nics; i++) {
create_qp_on_nic(&nics[i]);
}
// 2. Start background handshake thread
pthread_create(&thread, handshake_thread_func, lcomm);
// 3. Fill handle with ALL addresses
for (int i = 0; i < num_nics; i++) {
handle->addrs[i].ip = nics[i].ip_addr;
handle->addrs[i].mask = nics[i].netmask;
handle->addrs[i].qp_num = qps[i]->qp_num;
}
}
```
#### Phase 2: Connect
```c
ncclResult_t mesh_connect(int dev, void *handle, void **sendComm) {
// 1. Search peer's addresses for reachable one
for (int i = 0; i < handle->num_addrs; i++) {
uint32_t peer_subnet = handle->addrs[i].ip & handle->addrs[i].mask;
// Find local NIC on same subnet
for (int j = 0; j < num_local_nics; j++) {
if (local_nics[j].subnet == peer_subnet) {
selected_nic = &local_nics[j];
selected_peer_addr = &handle->addrs[i];
break;
}
}
}
// 2. Create QP on selected NIC
create_qp_on_nic(selected_nic);
// 3. Exchange QP info via TCP handshake
send_handshake(peer_ip, peer_port, &local_qp_info, &remote_qp_info);
// 4. Connect QP to peer's QP
connect_qp(local_qp, remote_qp_info);
}
```
#### Phase 3: Accept
```c
ncclResult_t mesh_accept(void *listenComm, void **recvComm) {
// Get pre-connected QP from handshake thread's queue
pthread_mutex_lock(&queue_mutex);
while (queue_empty) {
pthread_cond_wait(&queue_cond, &queue_mutex);
}
entry = dequeue();
pthread_mutex_unlock(&queue_mutex);
// Return the ready connection
rcomm->qp = entry->local_qp;
rcomm->nic = entry->nic;
}
```
### Background Handshake Thread
The handshake thread solves a critical deadlock problem:
**Without thread:**
```
Rank 0: connect() → TCP connect to Rank 1 → blocks waiting for accept()
Rank 1: connect() → TCP connect to Rank 0 → blocks waiting for accept()
// DEADLOCK: Neither can call accept() because both stuck in connect()
```
**With thread:**
```
Rank 0: listen() starts thread → thread waits for TCP connections
Rank 1: listen() starts thread → thread waits for TCP connections
Rank 0: connect() → TCP connects to Rank 1's thread → gets response → returns
Rank 1: connect() → TCP connects to Rank 0's thread → gets response → returns
Rank 0: accept() → gets QP from queue (filled by thread) → returns
Rank 1: accept() → gets QP from queue (filled by thread) → returns
// SUCCESS: Thread handles incoming connections asynchronously
```
### RDMA Queue Pair Setup
Each connection requires proper QP state transitions:
```
RESET → INIT → RTR → RTS
```
```c
int mesh_connect_qp(struct ibv_qp *qp, struct mesh_nic *nic,
struct mesh_handle *remote) {
// RESET → INIT
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = 0;
qp_attr.port_num = nic->port_num;
qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_LOCAL_WRITE;
ibv_modify_qp(qp, &qp_attr, ...);
// INIT → RTR (Ready to Receive)
qp_attr.qp_state = IBV_QPS_RTR;
qp_attr.path_mtu = IBV_MTU_4096;
qp_attr.dest_qp_num = remote->qp_num;
qp_attr.rq_psn = remote->psn;
qp_attr.ah_attr.dlid = remote->lid; // 0 for RoCE
qp_attr.ah_attr.grh.dgid = remote->gid; // Peer's GID
ibv_modify_qp(qp, &qp_attr, ...);
// RTR → RTS (Ready to Send)
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.sq_psn = local_psn;
qp_attr.timeout = 14;
qp_attr.retry_cnt = 7;
qp_attr.rnr_retry = 7;
ibv_modify_qp(qp, &qp_attr, ...);
}
```
### Data Transfer
#### Send Path
```c
ncclResult_t mesh_isend(void *sendComm, void *data, int size,
void *mhandle, void **request) {
struct ibv_send_wr wr = {
.wr_id = (uint64_t)req,
.sg_list = &sge,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
sge.addr = (uint64_t)data;
sge.length = size;
sge.lkey = mr->lkey;
ibv_post_send(comm->qp, &wr, &bad_wr);
}
```
#### Receive Path
```c
ncclResult_t mesh_irecv(void *recvComm, int n, void **data,
int *sizes, void **mhandles, void **request) {
struct ibv_recv_wr wr = {
.wr_id = (uint64_t)req,
.sg_list = &sge,
.num_sge = 1,
};
sge.addr = (uint64_t)data[0];
sge.length = sizes[0];
sge.lkey = mr->lkey;
ibv_post_recv(comm->qp, &wr, &bad_wr);
}
```
#### Completion Polling
```c
ncclResult_t mesh_test(void *request, int *done, int *sizes) {
struct ibv_wc wc;
int ret = ibv_poll_cq(req->cq, 1, &wc);
if (ret > 0) {
if (wc.status == IBV_WC_SUCCESS) {
*done = 1;
if (sizes) *sizes = wc.byte_len;
} else {
// Handle error
}
} else {
*done = 0; // Not complete yet
}
}
```
## Memory Registration
RDMA requires memory to be registered with the NIC:
```c
ncclResult_t mesh_regMr(void *comm, void *data, size_t size,
int type, void **mhandle) {
int access = IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ;
mrh->mr = ibv_reg_mr(nic->pd, data, size, access);
*mhandle = mrh;
}
```
**Note**: Current implementation uses host memory staging. GPU memory is copied to host, sent via RDMA, then copied back to GPU on the receiver. GPUDirect RDMA would eliminate these copies.
## Performance Considerations
### Current Bottlenecks
1. **Host Memory Staging**: GPU↔Host copies add latency
2. **Single QP**: One Queue Pair per connection limits parallelism
3. **Completion Signaling**: Every operation signals completion
### Achieved Performance
- **8+ GB/s** effective bandwidth
- **~64%** of 100 Gbps line rate
- Sufficient for distributed ML workloads
### Future Optimizations
1. **GPUDirect RDMA**: Register GPU memory directly
2. **Multi-QP**: Multiple QPs per connection
3. **Selective Signaling**: Signal every N operations
4. **Inline Data**: Small messages in WQE
## File Structure
```
nccl-mesh-plugin/
├── src/
│ └── mesh_plugin.c # Main implementation (~1400 lines)
├── include/
│ └── mesh_plugin.h # Data structures and declarations
├── nccl/
│ ├── net.h # NCCL net plugin interface
│ ├── net_v8.h # v8 properties structure
│ └── err.h # NCCL error codes
└── Makefile
```
## Debugging
Enable debug output:
```bash
export NCCL_DEBUG=INFO
export NCCL_MESH_DEBUG=1
```
Common issues:
1. **"No local NIC found"**: Subnet mismatch, check IP configuration
2. **"Handshake timeout"**: Firewall blocking TCP, check ports
3. **"QP transition failed"**: GID index wrong, try different `NCCL_MESH_GID_INDEX`
4. **"WC error status=12"**: Transport retry exceeded, check RDMA connectivity
## Conclusion
The NCCL Mesh Plugin demonstrates that with careful engineering, NCCL can be extended to support unconventional network topologies. The key innovations—multi-address handles, subnet-aware NIC selection, and asynchronous handshaking—provide a template for other custom NCCL transports.

249
docs/SETUP.md Normal file
View file

@ -0,0 +1,249 @@
# Hardware Setup Guide
This guide covers setting up a direct-connect RDMA mesh topology with multiple nodes.
## Overview
Our reference setup uses three NVIDIA DGX Spark workstations connected in a triangle mesh topology. Each pair of nodes has a dedicated 100 Gbps RDMA link on its own subnet.
## Hardware Requirements
- 3+ nodes with RDMA-capable NICs (ConnectX-6/7 recommended)
- Direct-attach cables (QSFP56 for 100GbE)
- Each node needs N-1 NICs for N nodes in a fully-connected mesh
## Network Topology
### Triangle Mesh (3 Nodes)
```
Node A
/ \
NIC1 NIC2
| |
192.168.101.x 192.168.100.x
| |
NIC1 NIC1
| |
Node B ---- Node C
NIC2
192.168.102.x
```
### IP Address Assignment
| Link | Subnet | Node A | Node B | Node C |
|------|--------|--------|--------|--------|
| A↔B | 192.168.101.0/24 | .2 | .3 | - |
| A↔C | 192.168.100.0/24 | .2 | - | .3 |
| B↔C | 192.168.102.0/24 | - | .2 | .3 |
## Network Configuration
### 1. Identify NICs
```bash
# List RDMA devices
ibv_devices
# List network interfaces with RDMA
ls -la /sys/class/infiniband/*/device/net/
```
### 2. Configure IP Addresses
On **Node A** (example):
```bash
# Link to Node B
sudo ip addr add 192.168.101.2/24 dev enp1s0f0np0
sudo ip link set enp1s0f0np0 up
# Link to Node C
sudo ip addr add 192.168.100.2/24 dev enp1s0f1np1
sudo ip link set enp1s0f1np1 up
```
On **Node B**:
```bash
# Link to Node A
sudo ip addr add 192.168.101.3/24 dev enp1s0f0np0
sudo ip link set enp1s0f0np0 up
# Link to Node C
sudo ip addr add 192.168.102.2/24 dev enp1s0f1np1
sudo ip link set enp1s0f1np1 up
```
On **Node C**:
```bash
# Link to Node A
sudo ip addr add 192.168.100.3/24 dev enp1s0f0np0
sudo ip link set enp1s0f0np0 up
# Link to Node B
sudo ip addr add 192.168.102.3/24 dev enp1s0f1np1
sudo ip link set enp1s0f1np1 up
```
### 3. Make Configuration Persistent
Create netplan config (Ubuntu):
```yaml
# /etc/netplan/99-rdma-mesh.yaml
network:
version: 2
ethernets:
enp1s0f0np0:
addresses:
- 192.168.101.2/24 # Adjust per node
enp1s0f1np1:
addresses:
- 192.168.100.2/24 # Adjust per node
```
Apply:
```bash
sudo netplan apply
```
## Verify Connectivity
### 1. Ping Test
From Node A:
```bash
ping 192.168.101.3 # Node B
ping 192.168.100.3 # Node C
```
### 2. RDMA Test
```bash
# On Node B (server)
ib_send_bw -d rocep1s0f0 -x 3
# On Node A (client)
ib_send_bw -d rocep1s0f0 -x 3 192.168.101.3
```
Expected output: ~12 GB/s for 100GbE
### 3. Verify GID Index
```bash
# Show GID table
show_gids
# Find RoCE v2 GID (usually index 3)
ibv_devinfo -v | grep -A5 GID
```
## RoCE Configuration
### Enable RoCE v2
```bash
# Check current mode
cat /sys/class/infiniband/rocep*/ports/1/gid_attrs/types/*
# Enable RoCE v2 (if needed)
echo "RoCE v2" | sudo tee /sys/class/infiniband/rocep1s0f0/ports/1/gid_attrs/types/0
```
### Configure ECN (Optional but Recommended)
```bash
# Enable ECN for RoCE
sudo sysctl -w net.ipv4.tcp_ecn=1
# Configure PFC (Priority Flow Control) on switch if applicable
```
## Firewall Configuration
Open ports for NCCL communication:
```bash
# TCP ports for handshake (dynamic, 40000-50000 range)
sudo ufw allow 40000:50000/tcp
# Or disable firewall for mesh interfaces
sudo ufw allow in on enp1s0f0np0
sudo ufw allow in on enp1s0f1np1
```
## Troubleshooting
### No RDMA Devices Found
```bash
# Load kernel modules
sudo modprobe ib_core
sudo modprobe mlx5_core
sudo modprobe mlx5_ib
# Check dmesg
dmesg | grep -i mlx
```
### Link Not Coming Up
```bash
# Check physical connection
ethtool enp1s0f0np0
# Check for errors
ip -s link show enp1s0f0np0
```
### RDMA Connection Fails
```bash
# Verify GID is populated
cat /sys/class/infiniband/rocep1s0f0/ports/1/gids/3
# Check RDMA CM
rdma link show
```
### Wrong GID Index
Try different GID indices:
```bash
export NCCL_MESH_GID_INDEX=0 # or 1, 2, 3...
```
## Scaling Beyond 3 Nodes
For N nodes in a fully-connected mesh:
- Each node needs N-1 NICs
- Total links: N*(N-1)/2
- Each link on unique subnet
For 4 nodes:
```
A
/|\
B-+-C
\|/
D
```
- 6 links, 6 subnets
- Each node needs 3 NICs
For larger clusters, consider a **partial mesh** or **fat-tree** topology with relay routing (not yet implemented in this plugin).
## Reference: DGX Spark Mesh
Our tested configuration:
| Hostname | Management IP | Mesh IPs |
|----------|--------------|----------|
| titanic (A) | 10.0.0.170 | 192.168.100.2, 192.168.101.2 |
| iceberg (B) | 10.0.0.171 | 192.168.101.3, 192.168.102.2 |
| carpathia (C) | 10.0.0.172 | 192.168.100.3, 192.168.102.3 |

View file

@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""
Bandwidth benchmark for NCCL Mesh Plugin
Usage:
# On each node (adjust --rank):
python benchmark_bandwidth.py --rank 0 --world-size 3 --master-ip 10.0.0.170
"""
import argparse
import time
import torch
import torch.distributed as dist
def benchmark_allreduce(size_mb: int, iterations: int, warmup: int = 5):
"""Benchmark all-reduce bandwidth"""
# Create tensor
num_elements = (size_mb * 1024 * 1024) // 4 # float32 = 4 bytes
tensor = torch.ones(num_elements, device='cuda', dtype=torch.float32)
# Warmup
for _ in range(warmup):
dist.all_reduce(tensor)
torch.cuda.synchronize()
# Benchmark
start = time.perf_counter()
for _ in range(iterations):
dist.all_reduce(tensor)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
# Calculate bandwidth
# All-reduce transfers 2*(N-1)/N * size data in ring algorithm
total_data_gb = (size_mb * iterations) / 1024
bandwidth_gbs = total_data_gb / elapsed
return bandwidth_gbs, elapsed
def main():
parser = argparse.ArgumentParser(description='Benchmark NCCL bandwidth')
parser.add_argument('--rank', type=int, required=True)
parser.add_argument('--world-size', type=int, default=3)
parser.add_argument('--master-ip', type=str, default='10.0.0.170')
parser.add_argument('--master-port', type=int, default=29500)
parser.add_argument('--iterations', type=int, default=20)
args = parser.parse_args()
# Initialize
init_method = f'tcp://{args.master_ip}:{args.master_port}'
dist.init_process_group('nccl', rank=args.rank, world_size=args.world_size,
init_method=init_method)
if args.rank == 0:
print(f'\n{"="*60}')
print(f'NCCL Mesh Plugin Bandwidth Benchmark')
print(f'World size: {args.world_size}')
print(f'Iterations per size: {args.iterations}')
print(f'{"="*60}\n')
print(f'{"Size":<12} {"Bandwidth":<15} {"Time":<12}')
print(f'{"-"*12} {"-"*15} {"-"*12}')
# Test different sizes
sizes_mb = [1, 4, 16, 64, 128, 256, 512]
for size_mb in sizes_mb:
bandwidth, elapsed = benchmark_allreduce(size_mb, args.iterations)
if args.rank == 0:
print(f'{size_mb:>6} MB {bandwidth:>8.2f} GB/s {elapsed:>6.3f} s')
# Sync between sizes
dist.barrier()
if args.rank == 0:
print(f'\n{"="*60}')
print('Benchmark complete!')
print(f'{"="*60}\n')
dist.destroy_process_group()
if __name__ == '__main__':
main()

View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
Distributed LLM Inference with NCCL Mesh Plugin
This example demonstrates loading and running inference on a large language
model distributed across multiple GPUs using the NCCL Mesh Plugin.
Usage:
# On each node (adjust --rank):
python distributed_llm.py --rank 0 --world-size 3 --master-ip 10.0.0.170
Environment setup (run on each node):
cd ~/nccl-mesh-plugin
export LD_LIBRARY_PATH=$(pwd):$LD_LIBRARY_PATH
export NCCL_NET_PLUGIN=mesh
export NCCL_DEBUG=WARN
"""
import argparse
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from accelerate import Accelerator
def main():
parser = argparse.ArgumentParser(description='Distributed LLM Inference')
parser.add_argument('--rank', type=int, required=True)
parser.add_argument('--world-size', type=int, default=3)
parser.add_argument('--master-ip', type=str, default='10.0.0.170')
parser.add_argument('--master-port', type=int, default=29500)
parser.add_argument('--model', type=str, default='mistralai/Mistral-7B-Instruct-v0.2',
help='Model to load (default: Mistral-7B)')
parser.add_argument('--prompt', type=str,
default='The future of distributed AI computing is',
help='Prompt for generation')
parser.add_argument('--max-tokens', type=int, default=100,
help='Maximum tokens to generate')
args = parser.parse_args()
# Initialize accelerator
accelerator = Accelerator()
print(f'Rank {accelerator.process_index}: Loading tokenizer...')
tokenizer = AutoTokenizer.from_pretrained(args.model)
print(f'Rank {accelerator.process_index}: Loading model...')
model = AutoModelForCausalLM.from_pretrained(
args.model,
torch_dtype=torch.bfloat16,
device_map='auto',
)
print(f'Rank {accelerator.process_index}: Model loaded!')
# Only rank 0 generates
if accelerator.is_main_process:
print(f'\nGenerating text...')
print(f'Prompt: "{args.prompt}"\n')
inputs = tokenizer(args.prompt, return_tensors='pt').to('cuda')
outputs = model.generate(
**inputs,
max_new_tokens=args.max_tokens,
do_sample=True,
temperature=0.7,
top_p=0.9,
)
result = tokenizer.decode(outputs[0], skip_special_tokens=True)
print('=' * 60)
print('Generated Text:')
print('=' * 60)
print(result)
print('=' * 60)
# Wait for all ranks
accelerator.wait_for_everyone()
print(f'Rank {accelerator.process_index}: Done!')
if __name__ == '__main__':
main()

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Basic all-reduce test for NCCL Mesh Plugin
Usage:
# On rank 0:
python test_allreduce.py --rank 0 --world-size 3 --master-ip 10.0.0.170
# On rank 1:
python test_allreduce.py --rank 1 --world-size 3 --master-ip 10.0.0.170
# On rank 2:
python test_allreduce.py --rank 2 --world-size 3 --master-ip 10.0.0.170
"""
import argparse
import torch
import torch.distributed as dist
def main():
parser = argparse.ArgumentParser(description='Test NCCL all-reduce')
parser.add_argument('--rank', type=int, required=True, help='Rank of this process')
parser.add_argument('--world-size', type=int, default=3, help='Total number of processes')
parser.add_argument('--master-ip', type=str, default='10.0.0.170', help='Master node IP')
parser.add_argument('--master-port', type=int, default=29500, help='Master node port')
args = parser.parse_args()
# Initialize process group
init_method = f'tcp://{args.master_ip}:{args.master_port}'
print(f'Rank {args.rank}: Initializing with {init_method}')
dist.init_process_group(
backend='nccl',
rank=args.rank,
world_size=args.world_size,
init_method=init_method
)
print(f'Rank {args.rank}: Process group initialized')
# Create tensor on GPU
tensor = torch.ones(1000, device='cuda')
print(f'Rank {args.rank}: Created tensor with sum = {tensor.sum().item()}')
# All-reduce (sum)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
result = tensor[0].item()
expected = float(args.world_size)
print(f'Rank {args.rank}: After all-reduce, tensor[0] = {result}')
if abs(result - expected) < 0.001:
print(f'Rank {args.rank}: ✓ SUCCESS! Result matches expected value {expected}')
else:
print(f'Rank {args.rank}: ✗ FAILED! Expected {expected}, got {result}')
# Cleanup
dist.destroy_process_group()
print(f'Rank {args.rank}: Done')
if __name__ == '__main__':
main()

257
include/mesh_plugin.h Normal file
View file

@ -0,0 +1,257 @@
/*
* NCCL Mesh Plugin - Subnet-aware RDMA transport
*
* Enables NCCL to work with direct-connect mesh topologies where
* each node pair is on a different subnet.
*/
#ifndef NCCL_MESH_PLUGIN_H
#define NCCL_MESH_PLUGIN_H
#include <stdint.h>
#include <pthread.h>
#include <infiniband/verbs.h>
#define MESH_MAX_NICS 8
#define MESH_MAX_QPS 256
#define MESH_MAX_MRS 1024
#define MESH_HANDLE_MAGIC 0x4D455348 // "MESH"
// Forward declarations
struct mesh_plugin_state;
struct mesh_nic;
struct mesh_comm;
/*
* Represents one RDMA-capable NIC with its subnet information
*/
struct mesh_nic {
// RDMA resources
struct ibv_context *context;
struct ibv_pd *pd;
int port_num;
int gid_index;
// Network addressing
uint32_t ip_addr; // Host byte order
uint32_t netmask; // Host byte order
uint32_t subnet; // ip_addr & netmask
// Device identification
char dev_name[64]; // RDMA device name (e.g., "rocep1s0f1")
char if_name[64]; // Network interface name (e.g., "enp1s0f1np1")
char pci_path[256]; // PCI bus path
// Capabilities
int max_qp;
int max_cq;
int max_mr;
int max_sge;
uint64_t max_mr_size;
int gdr_supported; // GPUDirect RDMA support
// Statistics
uint64_t bytes_sent;
uint64_t bytes_recv;
uint64_t connections;
};
/*
* Address entry for multi-homed hosts
*/
#define MESH_MAX_ADDRS 6
struct mesh_addr_entry {
uint32_t ip; // IP address (network byte order)
uint32_t mask; // Subnet mask (network byte order)
uint16_t qp_num; // QP number for this NIC
uint8_t nic_idx; // Index into our NIC array
uint8_t gid_index; // GID index for this NIC
};
/*
* Connection handle - exchanged between peers during setup
* Must fit within NCCL_NET_HANDLE_MAXSIZE (128 bytes)
*/
struct mesh_handle {
uint32_t magic; // MESH_HANDLE_MAGIC
uint8_t num_addrs; // Number of valid addresses
uint8_t selected_idx; // Which address was selected (set by connect)
uint16_t lid; // IB LID (0 for RoCE)
uint16_t qp_num; // QP number (for compat with mesh_connect_qp)
uint16_t handshake_port; // TCP port for QP handshake
uint8_t port_num; // Port number (usually 1)
uint8_t mtu; // MTU setting
uint32_t psn; // Packet sequence number
uint32_t handshake_ip; // IP address for handshake (network byte order)
union ibv_gid gid; // GID (16 bytes)
struct mesh_addr_entry addrs[MESH_MAX_ADDRS]; // 12 bytes each
// Total: 4+1+1+2+2+2+1+1+4+4+16 + 6*12 = 38 + 72 = 110 bytes (fits in 128)
};
/*
* Listen state - waiting for incoming connections
* Creates QPs on ALL NICs so any peer can connect
*/
#define HANDSHAKE_QUEUE_SIZE 16
/*
* QP info exchanged during handshake
*/
struct mesh_qp_info {
uint32_t qp_num; // Network byte order
uint32_t psn; // Network byte order
uint8_t gid[16]; // Raw GID
uint32_t ip; // Network byte order
uint8_t gid_index;
uint8_t nic_idx; // Which NIC on the listener
uint8_t reserved[2];
};
struct handshake_entry {
struct mesh_qp_info remote_info;
struct ibv_qp *local_qp;
struct ibv_cq *local_cq;
struct mesh_nic *nic;
int valid;
};
struct mesh_listen_comm {
int num_qps;
struct {
struct mesh_nic *nic;
struct ibv_qp *qp;
struct ibv_cq *cq;
} qps[MESH_MAX_NICS];
uint32_t psn;
int ready;
// Handshake socket for QP info exchange
int handshake_sock;
uint16_t handshake_port;
uint32_t handshake_ip;
// Background handshake thread
pthread_t handshake_thread;
int thread_running;
int thread_stop;
// Queue of received handshakes for accept() to consume
struct handshake_entry handshake_queue[HANDSHAKE_QUEUE_SIZE];
int queue_head;
int queue_tail;
pthread_mutex_t queue_mutex;
pthread_cond_t queue_cond;
};
/*
* Send/Receive communication state
*/
struct mesh_send_comm {
struct mesh_nic *nic;
struct ibv_qp *qp;
struct ibv_cq *cq;
uint32_t remote_qp_num;
union ibv_gid remote_gid;
int connected;
// Request tracking
struct mesh_request *requests[MESH_MAX_QPS];
int num_requests;
};
struct mesh_recv_comm {
struct mesh_nic *nic;
struct ibv_qp *qp;
struct ibv_cq *cq;
int connected;
// Request tracking
struct mesh_request *requests[MESH_MAX_QPS];
int num_requests;
};
/*
* Memory registration handle
*/
struct mesh_mr_handle {
struct ibv_mr *mr;
struct mesh_nic *nic;
void *addr;
size_t size;
};
/*
* Async request state
*/
struct mesh_request {
int used;
int done;
size_t size;
struct ibv_cq *cq; // CQ to poll for completion
struct ibv_wc wc;
};
/*
* Global plugin state
*/
struct mesh_plugin_state {
struct mesh_nic nics[MESH_MAX_NICS];
int num_nics;
int initialized;
// Configuration
int gid_index; // From NCCL_MESH_GID_INDEX
int debug; // From NCCL_MESH_DEBUG
// Logging (provided by NCCL)
void (*log_fn)(int level, unsigned long flags, const char *file,
int line, const char *fmt, ...);
};
// Global state (singleton)
extern struct mesh_plugin_state g_mesh_state;
/*
* Internal functions
*/
// Initialization
int mesh_init_nics(void);
int mesh_discover_nic_ips(void);
int mesh_setup_nic(struct mesh_nic *nic, struct ibv_device *device);
// Routing
struct mesh_nic* mesh_find_nic_for_ip(uint32_t peer_ip);
struct mesh_nic* mesh_find_nic_by_name(const char *name);
int mesh_get_nic_index(struct mesh_nic *nic);
// RDMA operations
int mesh_create_qp(struct mesh_nic *nic, struct ibv_qp **qp, struct ibv_cq **cq);
int mesh_connect_qp(struct ibv_qp *qp, struct mesh_nic *nic, struct mesh_handle *remote);
int mesh_post_send(struct mesh_send_comm *comm, void *data, size_t size,
struct mesh_mr_handle *mr, struct mesh_request *req);
int mesh_post_recv(struct mesh_recv_comm *comm, void *data, size_t size,
struct mesh_mr_handle *mr, struct mesh_request *req);
int mesh_poll_cq(struct ibv_cq *cq, struct mesh_request *req);
// Utilities
uint32_t mesh_ip_to_uint(const char *ip_str);
void mesh_uint_to_ip(uint32_t ip, char *buf, size_t len);
int mesh_get_interface_ip(const char *if_name, uint32_t *ip, uint32_t *mask);
const char* mesh_find_netdev_for_rdma(const char *rdma_dev);
// Logging macros
#define MESH_LOG(level, fmt, ...) \
do { \
if (g_mesh_state.log_fn) { \
g_mesh_state.log_fn(level, 0, __FILE__, __LINE__, fmt, ##__VA_ARGS__); \
} \
} while(0)
#define MESH_INFO(fmt, ...) MESH_LOG(NCCL_LOG_INFO, "MESH " fmt, ##__VA_ARGS__)
#define MESH_WARN(fmt, ...) MESH_LOG(NCCL_LOG_WARN, "MESH " fmt, ##__VA_ARGS__)
#define MESH_DEBUG(fmt, ...) \
do { if (g_mesh_state.debug) MESH_LOG(NCCL_LOG_TRACE, "MESH " fmt, ##__VA_ARGS__); } while(0)
#endif // NCCL_MESH_PLUGIN_H

47
nccl/err.h Normal file
View file

@ -0,0 +1,47 @@
/*
* NCCL error codes - extracted from NCCL headers
*/
#ifndef NCCL_ERR_H
#define NCCL_ERR_H
typedef enum {
ncclSuccess = 0,
ncclUnhandledCudaError = 1,
ncclSystemError = 2,
ncclInternalError = 3,
ncclInvalidArgument = 4,
ncclInvalidUsage = 5,
ncclRemoteError = 6,
ncclInProgress = 7,
ncclNumResults = 8
} ncclResult_t;
// Logging levels
#define NCCL_LOG_NONE 0
#define NCCL_LOG_VERSION 1
#define NCCL_LOG_WARN 2
#define NCCL_LOG_INFO 3
#define NCCL_LOG_ABORT 4
#define NCCL_LOG_TRACE 5
// Debug logger function type
typedef void (*ncclDebugLogger_t)(int level, unsigned long flags,
const char *file, int line, const char *fmt, ...);
// Pointer support flags
#define NCCL_PTR_HOST 0x1
#define NCCL_PTR_CUDA 0x2
#define NCCL_PTR_DMABUF 0x4
// Maximum handle size
#define NCCL_NET_HANDLE_MAXSIZE 128
// Net device types
#define NCCL_NET_DEVICE_HOST 0
#define NCCL_NET_DEVICE_INVALID_VERSION 0
// Maximum sizes
#define NCCL_MAX_NET_SIZE_BYTES (1ULL << 31)
#endif // NCCL_ERR_H

18
nccl/net.h Normal file
View file

@ -0,0 +1,18 @@
/*
* NCCL Net Plugin API - main header
*/
#ifndef NCCL_NET_H
#define NCCL_NET_H
#include "err.h"
#include "net_v8.h"
// Maximum number of outstanding requests
#define NCCL_NET_MAX_REQUESTS 32
// Use v8 as current version
typedef ncclNet_v8_t ncclNet_t;
typedef ncclNetProperties_v8_t ncclNetProperties_t;
#endif // NCCL_NET_H

101
nccl/net_v8.h Normal file
View file

@ -0,0 +1,101 @@
/*
* NCCL Net Plugin API v8 - extracted from NCCL headers
*/
#ifndef NCCL_NET_V8_H
#define NCCL_NET_V8_H
#include "err.h"
#include <stdint.h>
#include <stddef.h>
// Network device handle (opaque to NCCL)
typedef void* ncclNetDeviceHandle_t;
// Network properties structure (v8)
typedef struct {
char* name; // Used mostly for logging
char* pciPath; // Path to the PCI device in /sys
uint64_t guid; // Unique identifier for the NIC chip
int ptrSupport; // NCCL_PTR_HOST or NCCL_PTR_HOST|NCCL_PTR_CUDA
int speed; // Port speed in Mbps
int port; // Port number
float latency; // Network latency in microseconds
int maxComms; // Maximum number of comms we can create
int maxRecvs; // Maximum number of grouped receives
int netDeviceType; // Network device type
int netDeviceVersion; // Network device version
uint64_t maxP2pBytes; // Maximum P2P transfer size
} ncclNetProperties_v8_t;
// Net plugin structure v8
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network
ncclResult_t (*init)(ncclDebugLogger_t logFunction);
// Return the number of adapters
ncclResult_t (*devices)(int* ndev);
// Get various device properties
ncclResult_t (*getProperties)(int dev, ncclNetProperties_v8_t* props);
// Create a receiving object and provide a handle to connect to it
// The handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be
// exchanged between ranks to create a connection
ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer
// This call must not block for the connection to be established, and
// instead should return ncclSuccess with sendComm == NULL if the
// connection is not established yet
ncclResult_t (*connect)(int dev, void* handle, void** sendComm,
ncclNetDeviceHandle_t** sendDevComm);
// Finalize connection establishment after remote peer has called connect
// This call must not block for the connection to be established, and
// instead should return ncclSuccess with recvComm == NULL if the
// connection is not established yet
ncclResult_t (*accept)(void* listenComm, void** recvComm,
ncclNetDeviceHandle_t** recvDevComm);
// Register/deregister memory for use with send/recv
ncclResult_t (*regMr)(void* comm, void* data, size_t size, int type,
void** mhandle);
ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type,
uint64_t offset, int fd, void** mhandle);
ncclResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer
// May return ncclInProgress if the operation cannot be posted immediately
ncclResult_t (*isend)(void* sendComm, void* data, int size, int tag,
void* mhandle, void** request);
// Asynchronous receive from a peer
// May return ncclInProgress if the operation cannot be posted immediately
ncclResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes,
int* tags, void** mhandles, void** request);
// Flush data received through irecv
ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes,
void** mhandles, void** request);
// Test whether a request has completed
ncclResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
ncclResult_t (*closeSend)(void* sendComm);
ncclResult_t (*closeRecv)(void* recvComm);
ncclResult_t (*closeListen)(void* listenComm);
// Get device-side memory handle for registered memory
ncclResult_t (*getDeviceMr)(void* comm, void* mhandle, void** dptr_mhandle);
// Notify that irecv has been consumed
ncclResult_t (*irecvConsumed)(void* recvComm, int n, void* request);
} ncclNet_v8_t;
#endif // NCCL_NET_V8_H

1508
src/mesh_plugin.c Normal file

File diff suppressed because it is too large Load diff