mirror of https://github.com/Jittor/Jittor
Merge branch 'ygy' of https://github.com/Jittor/jittor into gword
This commit is contained in:
commit
5a450a495b
|
@ -14,6 +14,7 @@
|
|||
#include <cuda_runtime.h>
|
||||
#include <helper_cuda.h>
|
||||
#include "nccl_warper.h"
|
||||
#include "ops/op_register.h"
|
||||
namespace jittor {
|
||||
|
||||
#ifndef JIT
|
||||
|
@ -21,13 +22,18 @@ NcclAllReduceOp::NcclAllReduceOp(Var* x) : x(x) {
|
|||
flags.set(NodeFlags::_cpu, 0);
|
||||
flags.set(NodeFlags::_cuda, 1);
|
||||
y = create_output(nullptr, x->dtype());
|
||||
ASSERT(x->dtype().is_float());
|
||||
}
|
||||
|
||||
void NcclAllReduceOp::infer_shape() {
|
||||
y->set_shape(x->shape);
|
||||
}
|
||||
|
||||
VarPtr NcclAllReduceOp::grad(Var* out, Var* dout, Var* v, int v_index) {
|
||||
static VarPtr(*nccl_all_reduce)(Var*) =
|
||||
get_op_info("nccl_all_reduce").get_constructor<VarPtr, Var*>();
|
||||
return nccl_all_reduce(dout);
|
||||
}
|
||||
|
||||
void NcclAllReduceOp::jit_prepare() {
|
||||
add_jit_define("Tx", x->dtype());
|
||||
add_jit_define("XDIM", JK::hex1(x->shape.size()));
|
||||
|
@ -37,11 +43,17 @@ void NcclAllReduceOp::jit_prepare() {
|
|||
#ifdef JIT_cuda
|
||||
|
||||
void NcclAllReduceOp::jit_run() {
|
||||
@define(T_NCCL,
|
||||
@if(@strcmp(@Tx,float)==0 || @strcmp(@Tx,float32)==0, ncclFloat)
|
||||
@if(@strcmp(@Tx,int)==0 || @strcmp(@Tx,int32)==0, ncclInt)
|
||||
@if(@strcmp(@Tx,float64)==0, ncclFloat64)
|
||||
@if(@strcmp(@Tx,int64)==0, ncclInt64)
|
||||
)
|
||||
@for(i, 0, XDIM, index_t xshape@i = x->shape[@i];)
|
||||
int size = 1 @for(i, 0, XDIM, * xshape@{i});
|
||||
auto* __restrict__ xp = x->ptr<Tx>();
|
||||
auto* __restrict__ yp = y->ptr<Tx>();
|
||||
checkCudaErrors(ncclAllReduce(xp, yp, size, ncclFloat, ncclSum, comm, 0));
|
||||
checkCudaErrors(ncclAllReduce(xp, yp, size, @T_NCCL, ncclSum, comm, 0));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -18,6 +18,7 @@ struct NcclAllReduceOp : Op {
|
|||
void infer_shape() override;
|
||||
|
||||
const char* name() const override { return "nccl_all_reduce"; }
|
||||
VarPtr grad(Var* out, Var* dout, Var* v, int v_index) override;
|
||||
DECLARE_jit_run;
|
||||
};
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <cuda_runtime.h>
|
||||
#include <helper_cuda.h>
|
||||
#include "nccl_warper.h"
|
||||
#include "ops/op_register.h"
|
||||
namespace jittor {
|
||||
|
||||
#ifndef JIT
|
||||
|
@ -21,13 +22,18 @@ NcclBroadcastOp::NcclBroadcastOp(Var* x, int root) : x(x), root(root) {
|
|||
flags.set(NodeFlags::_cpu, 0);
|
||||
flags.set(NodeFlags::_cuda, 1);
|
||||
y = create_output(nullptr, x->dtype());
|
||||
ASSERT(x->dtype().is_float());
|
||||
}
|
||||
|
||||
void NcclBroadcastOp::infer_shape() {
|
||||
y->set_shape(x->shape);
|
||||
}
|
||||
|
||||
VarPtr NcclBroadcastOp::grad(Var* out, Var* dout, Var* v, int v_index) {
|
||||
static VarPtr(*nccl_reduce)(Var*, int) =
|
||||
get_op_info("nccl_reduce").get_constructor<VarPtr, Var*, int>();
|
||||
return nccl_reduce(dout,root);
|
||||
}
|
||||
|
||||
void NcclBroadcastOp::jit_prepare() {
|
||||
add_jit_define("Tx", x->dtype());
|
||||
add_jit_define("XDIM", JK::hex1(x->shape.size()));
|
||||
|
@ -37,11 +43,17 @@ void NcclBroadcastOp::jit_prepare() {
|
|||
#ifdef JIT_cuda
|
||||
|
||||
void NcclBroadcastOp::jit_run() {
|
||||
@define(T_NCCL,
|
||||
@if(@strcmp(@Tx,float)==0 || @strcmp(@Tx,float32)==0, ncclFloat)
|
||||
@if(@strcmp(@Tx,int)==0 || @strcmp(@Tx,int32)==0, ncclInt)
|
||||
@if(@strcmp(@Tx,float64)==0, ncclFloat64)
|
||||
@if(@strcmp(@Tx,int64)==0, ncclInt64)
|
||||
)
|
||||
@for(i, 0, XDIM, index_t xshape@i = x->shape[@i];)
|
||||
int size = 1 @for(i, 0, XDIM, * xshape@{i});
|
||||
auto* __restrict__ xp = x->ptr<Tx>();
|
||||
auto* __restrict__ yp = y->ptr<Tx>();
|
||||
checkCudaErrors(ncclBroadcast(xp, yp, size, ncclFloat, root, comm, 0));
|
||||
checkCudaErrors(ncclBroadcast(xp, yp, size, @T_NCCL, root, comm, 0));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -19,6 +19,7 @@ struct NcclBroadcastOp : Op {
|
|||
void infer_shape() override;
|
||||
|
||||
const char* name() const override { return "nccl_broadcast"; }
|
||||
VarPtr grad(Var* out, Var* dout, Var* v, int v_index) override;
|
||||
DECLARE_jit_run;
|
||||
};
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <cuda_runtime.h>
|
||||
#include <helper_cuda.h>
|
||||
#include "nccl_warper.h"
|
||||
#include "ops/op_register.h"
|
||||
namespace jittor {
|
||||
|
||||
#ifndef JIT
|
||||
|
@ -21,13 +22,18 @@ NcclReduceOp::NcclReduceOp(Var* x, int root) : x(x), root(root) {
|
|||
flags.set(NodeFlags::_cpu, 0);
|
||||
flags.set(NodeFlags::_cuda, 1);
|
||||
y = create_output(nullptr, x->dtype());
|
||||
ASSERT(x->dtype().is_float());
|
||||
}
|
||||
|
||||
void NcclReduceOp::infer_shape() {
|
||||
y->set_shape(x->shape);
|
||||
}
|
||||
|
||||
VarPtr NcclReduceOp::grad(Var* out, Var* dout, Var* v, int v_index) {
|
||||
static VarPtr(*nccl_broadcast)(Var*, int) =
|
||||
get_op_info("nccl_broadcast").get_constructor<VarPtr, Var*, int>();
|
||||
return nccl_broadcast(dout,root);
|
||||
}
|
||||
|
||||
void NcclReduceOp::jit_prepare() {
|
||||
add_jit_define("Tx", x->dtype());
|
||||
add_jit_define("XDIM", JK::hex1(x->shape.size()));
|
||||
|
@ -37,11 +43,17 @@ void NcclReduceOp::jit_prepare() {
|
|||
#ifdef JIT_cuda
|
||||
|
||||
void NcclReduceOp::jit_run() {
|
||||
@define(T_NCCL,
|
||||
@if(@strcmp(@Tx,float)==0 || @strcmp(@Tx,float32)==0, ncclFloat)
|
||||
@if(@strcmp(@Tx,int)==0 || @strcmp(@Tx,int32)==0, ncclInt)
|
||||
@if(@strcmp(@Tx,float64)==0, ncclFloat64)
|
||||
@if(@strcmp(@Tx,int64)==0, ncclInt64)
|
||||
)
|
||||
@for(i, 0, XDIM, index_t xshape@i = x->shape[@i];)
|
||||
int size = 1 @for(i, 0, XDIM, * xshape@{i});
|
||||
auto* __restrict__ xp = x->ptr<Tx>();
|
||||
auto* __restrict__ yp = y->ptr<Tx>();
|
||||
checkCudaErrors(ncclReduce(xp, yp, size, ncclFloat, ncclSum, root, comm, 0));
|
||||
checkCudaErrors(ncclReduce(xp, yp, size, @T_NCCL, ncclSum, root, comm, 0));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -19,6 +19,7 @@ struct NcclReduceOp : Op {
|
|||
void infer_shape() override;
|
||||
|
||||
const char* name() const override { return "nccl_reduce"; }
|
||||
VarPtr grad(Var* out, Var* dout, Var* v, int v_index) override;
|
||||
DECLARE_jit_run;
|
||||
};
|
||||
|
||||
|
|
|
@ -7,10 +7,7 @@
|
|||
// file 'LICENSE.txt', which is part of this source code package.
|
||||
// ***************************************************************
|
||||
#include "nccl_warper.h"
|
||||
|
||||
#ifdef HAS_CUDA
|
||||
#include "event_queue.h"
|
||||
#endif
|
||||
|
||||
const char *_cudaGetErrorEnum(ncclResult_t error) {
|
||||
return ncclGetErrorString(error);
|
||||
|
|
|
@ -16,7 +16,7 @@ with lock.lock_scope():
|
|||
from jittor_core import *
|
||||
from jittor_core.ops import *
|
||||
from . import compile_extern
|
||||
from .compile_extern import mkl_ops
|
||||
from .compile_extern import mkl_ops, inside_mpi, mpi_ops
|
||||
|
||||
import contextlib
|
||||
import numpy as np
|
||||
|
@ -579,6 +579,15 @@ class Module:
|
|||
for p in self.parameters():
|
||||
if id(p) in self.backup_grad_state and self.backup_grad_state[id(p)]:
|
||||
p.start_grad()
|
||||
|
||||
def mpi_sync(self):
|
||||
if not inside_mpi():
|
||||
return
|
||||
ps = self.parameters()
|
||||
for p in ps:
|
||||
temp = mpi_ops.mpi_broadcast(p, 0)
|
||||
p.assign(temp.detach())
|
||||
p.detach_inplace()
|
||||
|
||||
def make_module(func, exec_n_args=1):
|
||||
class MakeModule(Module):
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# ***************************************************************
|
||||
# Copyright (c) 2020 Jittor. Authors:
|
||||
# Guowei Yang <471184555@qq.com>
|
||||
# Guoye Yang <498731903@qq.com>
|
||||
# Wenyang Zhou <576825820@qq.com>
|
||||
# Meng-Hao Guo <guomenghao1997@gmail.com>
|
||||
# Dun Liang <randonlang@gmail.com>.
|
||||
|
@ -154,6 +155,9 @@ class SGD(object):
|
|||
def step(self, loss):
|
||||
ps = self.parameters
|
||||
gs = jt.grad(loss, ps)
|
||||
if jt.compile_extern.inside_mpi():
|
||||
for g in gs:
|
||||
g.assign(jt.compile_extern.mpi_ops.mpi_all_reduce(g))
|
||||
for p, g, v in zip(ps, gs, self.values):
|
||||
dp = p * self.weight_decay + g
|
||||
v.assign(self.momentum * v + dp * (1 - self.dampening))
|
||||
|
@ -196,6 +200,9 @@ class Adam(object):
|
|||
def step(self, loss):
|
||||
ps = self.parameters
|
||||
gs = jt.grad(loss, ps)
|
||||
if jt.compile_extern.inside_mpi():
|
||||
for g in gs:
|
||||
g.assign(jt.compile_extern.mpi_ops.mpi_all_reduce(g))
|
||||
self.adam_step += 1
|
||||
n, (b0, b1) = float(self.adam_step), self.betas
|
||||
for p, g, v, m in zip(ps, gs, self.values, self.m):
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# ***************************************************************
|
||||
# Copyright (c) 2020 Jittor. Authors:
|
||||
# Guoye Yang <498731903@qq.com>
|
||||
# Guowei Yang <471184555@qq.com>
|
||||
# Dun Liang <randonlang@gmail.com>.
|
||||
# All Rights Reserved.
|
||||
|
@ -10,53 +11,142 @@ import unittest
|
|||
import os, sys
|
||||
import jittor as jt
|
||||
import numpy as np
|
||||
from jittor import nn
|
||||
from jittor import nn, Module
|
||||
import copy
|
||||
from jittor.test.test_log import find_log_with_re
|
||||
n = 2
|
||||
mpi = jt.compile_extern.mpi
|
||||
|
||||
def test_all_reduce():
|
||||
print("test all_reduce")
|
||||
x = jt.random([5, 5])
|
||||
y = jt.compile_extern.nccl_ops.nccl_all_reduce(x)
|
||||
assert np.allclose(y.data, (x*3).data)
|
||||
with jt.log_capture_scope(enable_tuner=1, log_silent=1,
|
||||
log_v=1, log_vprefix="op.cc=100,exe=1000"
|
||||
) as raw_log:
|
||||
x = jt.random([5, 5])
|
||||
y = jt.compile_extern.mpi_ops.mpi_all_reduce(x)
|
||||
assert np.allclose(y.data, (x*n).data)
|
||||
g = jt.grad(y,x)
|
||||
assert np.allclose(g.data, np.ones([5,5])*n)
|
||||
|
||||
logs = find_log_with_re(raw_log, "(Jit op key (not )?found: nccl_all_reduce.*)")
|
||||
assert len(logs)==2, len(logs)
|
||||
|
||||
def test_broadcast():
|
||||
print("test broadcast")
|
||||
mpi = jt.compile_extern.mpi
|
||||
data = jt.random([5, 5])
|
||||
if mpi.world_rank() == 0:
|
||||
x = data
|
||||
else:
|
||||
x = jt.zeros([5, 5])
|
||||
y = jt.compile_extern.nccl_ops.nccl_broadcast(x, 0)
|
||||
assert np.allclose(y.data, data.data)
|
||||
with jt.log_capture_scope(enable_tuner=1, log_silent=1,
|
||||
log_v=1, log_vprefix="op.cc=100,exe=1000"
|
||||
) as raw_log:
|
||||
data = jt.random([5, 5])
|
||||
if mpi.world_rank() == 0:
|
||||
x = data
|
||||
else:
|
||||
x = jt.zeros([5, 5])
|
||||
y = jt.compile_extern.mpi_ops.mpi_broadcast(x, 0)
|
||||
assert np.allclose(y.data, data.data)
|
||||
g = jt.grad(y.sum(),x)
|
||||
g_ = g.data
|
||||
if mpi.world_rank() == 0:
|
||||
assert np.allclose(g_, np.ones([5,5])*n)
|
||||
logs = find_log_with_re(raw_log, "(Jit op key (not )?found: nccl_broadcast.*)")
|
||||
assert len(logs)==1, len(logs)
|
||||
|
||||
def test_reduce():
|
||||
print("test reduce")
|
||||
mpi = jt.compile_extern.mpi
|
||||
x = jt.random([5, 5])
|
||||
y = jt.compile_extern.nccl_ops.nccl_reduce(x, 0)
|
||||
y.sync()
|
||||
with jt.log_capture_scope(enable_tuner=1, log_silent=1,
|
||||
log_v=1, log_vprefix="op.cc=100,exe=1000"
|
||||
) as raw_log:
|
||||
x = jt.random([5, 5])
|
||||
y = jt.compile_extern.mpi_ops.mpi_reduce(x, 0)
|
||||
y_ = y.data
|
||||
x_ = (x*n).data
|
||||
if mpi.world_rank() == 0:
|
||||
assert np.allclose(y_, x_)
|
||||
g = jt.grad(y,x)
|
||||
assert np.allclose(g.data, np.ones([5,5]))
|
||||
logs = find_log_with_re(raw_log, "(Jit op key (not )?found: nccl_reduce.*)")
|
||||
assert len(logs)==1, len(logs)
|
||||
|
||||
class Model(Module):
|
||||
def __init__(self):
|
||||
self.linear1 = nn.Linear(3, 3)
|
||||
self.linear2 = nn.Linear(3, 1024, False)
|
||||
|
||||
def execute(self, x):
|
||||
x = self.linear1(x)
|
||||
x = nn.relu(x)
|
||||
return self.linear2(x)
|
||||
|
||||
def test_sync():
|
||||
print("test mpi_sync")
|
||||
net = Model()
|
||||
if mpi.world_rank() == 0:
|
||||
assert np.allclose(y.data, (x*3).data)
|
||||
net.linear1.weight *= 0
|
||||
net.linear2.weight *= 0
|
||||
net.linear1.bias *= 0
|
||||
net.linear1.weight += 1
|
||||
net.linear2.weight += 1
|
||||
net.linear1.bias += 1
|
||||
net.mpi_sync()
|
||||
assert np.allclose(net.linear1.weight.data, jt.ones(net.linear1.weight.shape).data)
|
||||
assert np.allclose(net.linear2.weight.data, jt.ones(net.linear2.weight.shape).data)
|
||||
assert np.allclose(net.linear1.bias.data, jt.ones(net.linear1.bias.shape).data)
|
||||
|
||||
class Model2(Module):
|
||||
def __init__(self, input_size):
|
||||
self.linear1 = nn.Linear(input_size, 10)
|
||||
self.relu1 = nn.Relu()
|
||||
self.linear2 = nn.Linear(10, 1)
|
||||
def execute(self, x):
|
||||
x = self.linear1(x)
|
||||
x = self.relu1(x)
|
||||
return self.linear2(x)
|
||||
|
||||
def test_optimizer():
|
||||
print("test optimizer")
|
||||
def get_data(n):
|
||||
for i in range(n):
|
||||
x = np.random.rand(50, 1)
|
||||
y = x*x
|
||||
yield jt.float32(x), jt.float32(y)
|
||||
num = 2000
|
||||
model = Model2(1)
|
||||
model.mpi_sync()
|
||||
optimizer = nn.SGD(model.parameters(), 0.05)
|
||||
dataset = list(enumerate(get_data(num)))
|
||||
for i in range(mpi.world_rank(), num, n):
|
||||
id, (x, y) = dataset[i]
|
||||
pred_y = model(x)
|
||||
loss = (pred_y - y)*(pred_y - y)
|
||||
loss_mean = loss.mean()
|
||||
optimizer.step(loss_mean)
|
||||
assert loss_mean.data < 0.0025
|
||||
jt.clean()
|
||||
|
||||
def main():
|
||||
np.random.seed(0)
|
||||
jt.set_seed(3)
|
||||
with jt.flag_scope(use_cuda=1):
|
||||
if jt.compile_extern.nccl_ops:
|
||||
test_sync()
|
||||
test_all_reduce()
|
||||
test_broadcast()
|
||||
test_reduce()
|
||||
test_optimizer()
|
||||
|
||||
@unittest.skipIf(jt.compile_extern.mpi_ops is None, "no mpi found")
|
||||
class TestNcclOps(unittest.TestCase):
|
||||
@unittest.skipIf(mpi is None, "no inside mpirun")
|
||||
class TestMpi(unittest.TestCase):
|
||||
def test(self):
|
||||
mpi = jt.compile_extern.mpi
|
||||
if mpi.world_size() == 1:
|
||||
mpirun_path = jt.compiler.env_or_try_find('mpirun_path', 'mpirun')
|
||||
cmd = f"{mpirun_path} -np 3 {sys.executable} -m jittor.test.test_nccl_ops"
|
||||
print("run cmd", cmd)
|
||||
jt.compiler.run_cmd(cmd)
|
||||
else:
|
||||
main()
|
||||
main()
|
||||
|
||||
@unittest.skipIf(not jt.compile_extern.has_mpi, "no mpi found")
|
||||
class TestNcclOps(unittest.TestCase):
|
||||
def test_entry(self):
|
||||
if not jt.compile_extern.inside_mpi():
|
||||
mpirun_path = jt.compile_extern.mpicc_path.replace("mpicc", "mpirun")
|
||||
cmd = f"{mpirun_path} -np {n} {sys.executable} -m jittor.test.test_nccl_ops -v"
|
||||
print("run cmd:", cmd)
|
||||
assert os.system(cmd)==0, "run cmd failed: "+cmd
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in New Issue