-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathall_reduce_bench.py
71 lines (59 loc) · 2.1 KB
/
all_reduce_bench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
""" License: Apache License 2.0 https://www.apache.org/licenses/LICENSE-2.0.txt """
# this version has been derived from @jeffra's gist: https://gist.github.com/jeffra/b5e80466b4c86be00ea3b6f130fb7a36
# which in turn is derived from https://github.com/NVIDIA/nccl-tests
#
# to run for 2 nodes:
# python -m torch.distributed.run --nproc_per_node=2 all_reduce_bench.py
#
# the printed results are already n_gpu-agnostic (i.e. averaged for the world size)
import argparse
import fcntl
import os
import socket
import time
import torch
import torch.distributed as dist
TRIALS = 5
N = 500000
M = 2000
def printflock(*msgs):
""" print """
with open(__file__, "r") as fh:
fcntl.flock(fh, fcntl.LOCK_EX)
try:
print(*msgs)
finally:
fcntl.flock(fh, fcntl.LOCK_UN)
def timed_allreduce(mat, id):
pre = time.perf_counter()
dist.all_reduce(mat)
printflock(f"ignore me {int(mat[0][0])}") # required due to lazy evaluation
duration = time.perf_counter() - pre
tput = ((M*N*4*2)/duration)*8 # *2 is for send + receive, *8 for gigabits/second
size = M * N * 4 # 4 is fp32
n = dist.get_world_size()
busbw = (size / duration) * (2 * (n - 1) / n) * 8
printflock(f"{id}:\n",
f"duration: {duration:.4f} sec\n",
f"algo throughput: {tput:.4f} bps, {tput/1e9:.4f} Gbps\n",
f"busbw: {busbw / 1e9:.4f} Gbps"
)
def run(local_rank):
hostname = socket.gethostname()
id = f"{hostname}:{local_rank}"
global_rank = dist.get_rank()
printflock(f"{id} data size: {M*N*4/1e9} GB")
mat = torch.rand(N, M, dtype=torch.float32).cuda(local_rank)
for i in range(TRIALS):
dist.barrier()
if global_rank == 0:
print(f"\n\n\n-----------trial-{i}----------------")
timed_allreduce(mat, id)
def init_processes(local_rank, fn, backend='nccl'):
torch.cuda.set_device(local_rank)
dist.init_process_group(backend)
fn(local_rank)
if __name__ == "__main__":
rank = int(os.environ["LOCAL_RANK"])
printflock("local_rank: %d" % rank)
init_processes(local_rank=rank, fn=run)