From ead42d0aad33bd148cd954101366b67e1d565d8d Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Wed, 29 Jun 2022 09:42:56 +0000 Subject: [PATCH 1/6] fix dataset --- .../pipeline_parallel_cn.rst | 79 +++++++++++-------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/docs/guides/06_distributed_training/pipeline_parallel_cn.rst b/docs/guides/06_distributed_training/pipeline_parallel_cn.rst index 8a2dd31cd87..c757c7f1260 100644 --- a/docs/guides/06_distributed_training/pipeline_parallel_cn.rst +++ b/docs/guides/06_distributed_training/pipeline_parallel_cn.rst @@ -64,6 +64,38 @@ import paddle.nn.functional as F import paddle.distributed as dist import random + from paddle.io import Dataset, BatchSampler, DataLoader + + +创建数据集 + +.. code-block:: python + BATCH_NUM = 20 + BATCH_SIZE = 16 + EPOCH_NUM = 4 + + IMAGE_SIZE = 784 + CLASS_NUM = 10 + MICRO_BATCH_SIZE = 2 + + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([1, 28, 28]).astype('float32') + label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + dataset = RandomDataset(BATCH_NUM * BATCH_SIZE) + train_reader = DataLoader(dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) 构建一个可以运行流水线的模型,模型的layer需要被LayerDesc或者继承了LayerDesc的SharedLayerDesc包裹,这里因为不需要共享参数,所以就使用LayerDesc @@ -77,8 +109,9 @@ def forward(self, x): return x.reshape(shape=self.shape) + class AlexNetPipeDesc(PipelineLayer): - def __init__(self, num_classes=10, **kwargs): + def __init__(self, num_classes=CLASS_NUM, **kwargs): self.num_classes = num_classes decs = [ LayerDesc( @@ -108,14 +141,11 @@ ] super(AlexNetPipeDesc, self).__init__( layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs) - + 然后初始化分布式环境,这一步主要是构建流水线通信组的拓扑 .. code-block:: python - batch_size = 4 - micro_batch_size = 2 - strategy = fleet.DistributedStrategy() model_parallel_size = 1 data_parallel_size = 1 @@ -126,12 +156,11 @@ "pp_degree": pipeline_parallel_size } strategy.pipeline_configs = { - "accumulate_steps": batch_size // micro_batch_size, - "micro_batch_size": micro_batch_size + "accumulate_steps": BATCH_SIZE // MICRO_BATCH_SIZE, + "micro_batch_size": MICRO_BATCH_SIZE } - - - fleet.init(is_collective=True, strategy=strategy) + + fleet.init(is_collective=True, strategy=strategy) 为了保证流水线并行参数初始化和普通模型初始化一致,需要在不同卡间设置不同的seed。 @@ -162,7 +191,6 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属 .. code-block:: python - class ReshapeHelp(Layer): def __init__(self, shape): super(ReshapeHelp, self).__init__() @@ -214,35 +242,16 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属 optimizer = fleet.distributed_optimizer(optimizer) -创建mnist数据集 - -.. code-block:: python - - train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=batch_size, drop_last=True - ) - 开始训练 model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式 .. code-block:: python - for step_id, data in enumerate(train_reader()): - x_data = np.array([x[0] for x in data]).astype("float32").reshape( - batch_size, 1, 28, 28 - ) - y_data = np.array([x[1] for x in data]).astype("int64").reshape( - batch_size, 1 - ) - img = paddle.to_tensor(x_data) - label = paddle.to_tensor(y_data) - img.stop_gradient = True - label.stop_gradient = True - if step_id >= 5: - break - - loss = model.train_batch([img, label], optimizer, scheduler) + for i, (image, label) in enumerate(train_reader()): + if i >= 5: + break + loss = model.train_batch([image, label], optimizer, scheduler) print("pp_loss: ", loss.numpy()) 运行方式(需要保证当前机器有两张GPU): @@ -252,7 +261,7 @@ model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式 export CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch alexnet_dygraph_pipeline.py # alexnet_dygraph_pipeline.py是用户运行动态图流水线的python文件 -基于AlexNet的流水线并行动态图代码:`alex `_。 +基于AlexNet的完整的流水线并行动态图代码:`alex `_。 控制台输出信息如下: From 9a8d109b419381f0168280ad94c963c8f7c087a8 Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Wed, 29 Jun 2022 13:41:04 +0000 Subject: [PATCH 2/6] add doc --- docs/api/api_label | 4 ++++ docs/api/paddle/batch_isend_irecv_cn.rst | 21 ++++++++++++++++++++ docs/api/paddle/distributed/irecv_cn.rst | 23 ++++++++++++++++++++++ docs/api/paddle/distributed/isend_cn.rst | 23 ++++++++++++++++++++++ docs/api/paddle/reduce_scatter_cn.rst | 25 ++++++++++++++++++++++++ 5 files changed, 96 insertions(+) create mode 100644 docs/api/paddle/batch_isend_irecv_cn.rst create mode 100644 docs/api/paddle/distributed/irecv_cn.rst create mode 100644 docs/api/paddle/distributed/isend_cn.rst create mode 100644 docs/api/paddle/reduce_scatter_cn.rst diff --git a/docs/api/api_label b/docs/api/api_label index fb622f16792..6006a1c1469 100644 --- a/docs/api/api_label +++ b/docs/api/api_label @@ -41,6 +41,10 @@ paddle.distributed.all_reduce .. _api_paddle_distributed_all_reduce: paddle.distributed.scatter .. _api_paddle_distributed_scatter: paddle.distributed.alltoall .. _api_paddle_distributed_alltoall: paddle.distributed.send .. _api_paddle_distributed_send: +paddle.distributed.isend .. _api_paddle_distributed_isend: +paddle.distributed.irecv .. _api_paddle_distributed_irecv: +paddle.distributed.batch_isend_irecv .. _api_paddle_distributed_batch_isend_irecv: +paddle.distributed.reduce_scatter .. _api_paddle_distributed_reduce_scatter: paddle.distributed.QueueDataset .. _api_paddle_distributed_QueueDataset: paddle.distributed.barrier .. _api_paddle_distributed_barrier: paddle.distributed.CountFilterEntry .. _api_paddle_distributed_CountFilterEntry: diff --git a/docs/api/paddle/batch_isend_irecv_cn.rst b/docs/api/paddle/batch_isend_irecv_cn.rst new file mode 100644 index 00000000000..c4067c7d658 --- /dev/null +++ b/docs/api/paddle/batch_isend_irecv_cn.rst @@ -0,0 +1,21 @@ +.. _cn_api_paddle_distributed_batch_isend_irecv: + +batch_isend_irecv +------------------------------- + + +.. py:function:: paddle.distributed.batch_isend_irecv(p2p_op_list) +异步发送或接收一批张量并返回请求列表 + + +参数 +::::::::: + - p2p_op_list – 点对点操作列表(每个操作符的类型为 ``paddle.distributed.P2POp``)。列表中 ``isend``/ ``irecv`` 的顺序需要与远程端对应的 ``isend`` / ``irecv`` 匹配。 + +返回 +::::::::: +返回Task列表。 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.batch_isend_irecv \ No newline at end of file diff --git a/docs/api/paddle/distributed/irecv_cn.rst b/docs/api/paddle/distributed/irecv_cn.rst new file mode 100644 index 00000000000..766c29a13f0 --- /dev/null +++ b/docs/api/paddle/distributed/irecv_cn.rst @@ -0,0 +1,23 @@ +.. _cn_api_paddle_distributed_irecv: + +irecv +------------------------------- + + +.. py:function:: paddle.distributed.irecv(tensor, src=None, group=None) +异步接受发送来的tensor。 + +参数 +::::::::: + - tensor (Tensor) - 要接受的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 + - src (int) - 接受的rank号。 + - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。 + + +返回 +::::::::: +返回Task。 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.irecv \ No newline at end of file diff --git a/docs/api/paddle/distributed/isend_cn.rst b/docs/api/paddle/distributed/isend_cn.rst new file mode 100644 index 00000000000..612ff05b5b7 --- /dev/null +++ b/docs/api/paddle/distributed/isend_cn.rst @@ -0,0 +1,23 @@ +.. _cn_api_paddle_distributed_isend: + +isend +------------------------------- + + +.. py:function:: paddle.distributed.isend(tensor, dst, group=None) +异步的将 ``tensor`` 发送到指定的rank进程上。 + +参数 +::::::::: + - tensor (Tensor) - 要发送的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 + - dst (int) - 目的的rank号。。 + - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。 + + +返回 +::::::::: +返回Task。 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.isend \ No newline at end of file diff --git a/docs/api/paddle/reduce_scatter_cn.rst b/docs/api/paddle/reduce_scatter_cn.rst new file mode 100644 index 00000000000..a2ba1fa7316 --- /dev/null +++ b/docs/api/paddle/reduce_scatter_cn.rst @@ -0,0 +1,25 @@ +.. _cn_api_paddle_distributed_reduce_scatter: + +reduce_scatter +------------------------------- + + +.. py:function:: paddle.distributed.reduce_scatter(output, input_list, op=ReduceOp.SUM, group=None, use_calc_stream=True) +规约,然后将张量列表分散到组中的所有进程上 + +参数 +::::::::: + - output (Tensor) – 输出的张量。 + - input_list (list(Tensor)) – 归约和切分的张量列表。 + - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD) – 操作类型,默认ReduceOp.SUM。 + - group: (Group, optional) – 通信组;如果是None,则使用默认通信组。 + - use_calc_stream: (bool, optional) – 决定是在计算流还是通信流上做该通信操作;默认为True,表示在计算流。 + + +返回 +::::::::: +返回Task。 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.reduce_scatter \ No newline at end of file From ef36236380b3714c2861455108fc61df7e5d91ad Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Wed, 6 Jul 2022 12:04:02 +0000 Subject: [PATCH 3/6] fix doc --- docs/api/paddle/batch_isend_irecv_cn.rst | 4 ++++ docs/api/paddle/distributed/irecv_cn.rst | 4 ++++ docs/api/paddle/distributed/isend_cn.rst | 5 +++++ docs/api/paddle/reduce_scatter_cn.rst | 4 ++++ 4 files changed, 17 insertions(+) diff --git a/docs/api/paddle/batch_isend_irecv_cn.rst b/docs/api/paddle/batch_isend_irecv_cn.rst index c4067c7d658..bb3e66ef067 100644 --- a/docs/api/paddle/batch_isend_irecv_cn.rst +++ b/docs/api/paddle/batch_isend_irecv_cn.rst @@ -16,6 +16,10 @@ batch_isend_irecv ::::::::: 返回Task列表。 +注意 +::::::::: +当前只支持动态图 + 代码示例 ::::::::: COPY-FROM: paddle.distributed.batch_isend_irecv \ No newline at end of file diff --git a/docs/api/paddle/distributed/irecv_cn.rst b/docs/api/paddle/distributed/irecv_cn.rst index 766c29a13f0..8e6adaa2fb4 100644 --- a/docs/api/paddle/distributed/irecv_cn.rst +++ b/docs/api/paddle/distributed/irecv_cn.rst @@ -18,6 +18,10 @@ irecv ::::::::: 返回Task。 +注意 +::::::::: +当前只支持动态图 + 代码示例 ::::::::: COPY-FROM: paddle.distributed.irecv \ No newline at end of file diff --git a/docs/api/paddle/distributed/isend_cn.rst b/docs/api/paddle/distributed/isend_cn.rst index 612ff05b5b7..daf71ef5246 100644 --- a/docs/api/paddle/distributed/isend_cn.rst +++ b/docs/api/paddle/distributed/isend_cn.rst @@ -18,6 +18,11 @@ isend ::::::::: 返回Task。 + +注意 +::::::::: +当前只支持动态图 + 代码示例 ::::::::: COPY-FROM: paddle.distributed.isend \ No newline at end of file diff --git a/docs/api/paddle/reduce_scatter_cn.rst b/docs/api/paddle/reduce_scatter_cn.rst index a2ba1fa7316..93ad397337d 100644 --- a/docs/api/paddle/reduce_scatter_cn.rst +++ b/docs/api/paddle/reduce_scatter_cn.rst @@ -20,6 +20,10 @@ reduce_scatter ::::::::: 返回Task。 +注意 +::::::::: +当前只支持动态图 + 代码示例 ::::::::: COPY-FROM: paddle.distributed.reduce_scatter \ No newline at end of file From f6fd791792dc4d919e24fef300f9aeccca8831c7 Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Wed, 6 Jul 2022 12:49:10 +0000 Subject: [PATCH 4/6] fix doc --- docs/api/api_label | 1 - docs/api/paddle/batch_isend_irecv_cn.rst | 25 ------------------------ 2 files changed, 26 deletions(-) delete mode 100644 docs/api/paddle/batch_isend_irecv_cn.rst diff --git a/docs/api/api_label b/docs/api/api_label index 6006a1c1469..6e3be163a2c 100644 --- a/docs/api/api_label +++ b/docs/api/api_label @@ -43,7 +43,6 @@ paddle.distributed.alltoall .. _api_paddle_distributed_alltoall: paddle.distributed.send .. _api_paddle_distributed_send: paddle.distributed.isend .. _api_paddle_distributed_isend: paddle.distributed.irecv .. _api_paddle_distributed_irecv: -paddle.distributed.batch_isend_irecv .. _api_paddle_distributed_batch_isend_irecv: paddle.distributed.reduce_scatter .. _api_paddle_distributed_reduce_scatter: paddle.distributed.QueueDataset .. _api_paddle_distributed_QueueDataset: paddle.distributed.barrier .. _api_paddle_distributed_barrier: diff --git a/docs/api/paddle/batch_isend_irecv_cn.rst b/docs/api/paddle/batch_isend_irecv_cn.rst deleted file mode 100644 index bb3e66ef067..00000000000 --- a/docs/api/paddle/batch_isend_irecv_cn.rst +++ /dev/null @@ -1,25 +0,0 @@ -.. _cn_api_paddle_distributed_batch_isend_irecv: - -batch_isend_irecv -------------------------------- - - -.. py:function:: paddle.distributed.batch_isend_irecv(p2p_op_list) -异步发送或接收一批张量并返回请求列表 - - -参数 -::::::::: - - p2p_op_list – 点对点操作列表(每个操作符的类型为 ``paddle.distributed.P2POp``)。列表中 ``isend``/ ``irecv`` 的顺序需要与远程端对应的 ``isend`` / ``irecv`` 匹配。 - -返回 -::::::::: -返回Task列表。 - -注意 -::::::::: -当前只支持动态图 - -代码示例 -::::::::: -COPY-FROM: paddle.distributed.batch_isend_irecv \ No newline at end of file From 0a8836ad91cc2dc06180cba09f93941e4e90b8b6 Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Wed, 6 Jul 2022 12:51:42 +0000 Subject: [PATCH 5/6] fix doc --- docs/api/paddle/reduce_scatter_cn.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/api/paddle/reduce_scatter_cn.rst b/docs/api/paddle/reduce_scatter_cn.rst index 93ad397337d..09fa00c339d 100644 --- a/docs/api/paddle/reduce_scatter_cn.rst +++ b/docs/api/paddle/reduce_scatter_cn.rst @@ -4,13 +4,13 @@ reduce_scatter ------------------------------- -.. py:function:: paddle.distributed.reduce_scatter(output, input_list, op=ReduceOp.SUM, group=None, use_calc_stream=True) +.. py:function:: paddle.distributed.reduce_scatter(tensor, tensor_list, op=ReduceOp.SUM, group=None, use_calc_stream=True) 规约,然后将张量列表分散到组中的所有进程上 参数 ::::::::: - - output (Tensor) – 输出的张量。 - - input_list (list(Tensor)) – 归约和切分的张量列表。 + - tensor (Tensor) – 输出的张量。 + - tensor_list (list(Tensor)) – 归约和切分的张量列表。 - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD) – 操作类型,默认ReduceOp.SUM。 - group: (Group, optional) – 通信组;如果是None,则使用默认通信组。 - use_calc_stream: (bool, optional) – 决定是在计算流还是通信流上做该通信操作;默认为True,表示在计算流。 From ca800c4e8ac40735265683a87bf8b5b49438c0cf Mon Sep 17 00:00:00 2001 From: ForFishes <2282912238@qq.com> Date: Mon, 11 Jul 2022 08:24:57 +0000 Subject: [PATCH 6/6] fix doc of rank message --- docs/api/paddle/distributed/irecv_cn.rst | 2 +- docs/api/paddle/distributed/isend_cn.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/api/paddle/distributed/irecv_cn.rst b/docs/api/paddle/distributed/irecv_cn.rst index 8e6adaa2fb4..62663348311 100644 --- a/docs/api/paddle/distributed/irecv_cn.rst +++ b/docs/api/paddle/distributed/irecv_cn.rst @@ -10,7 +10,7 @@ irecv 参数 ::::::::: - tensor (Tensor) - 要接受的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 - - src (int) - 接受的rank号。 + - src (int) - 接受节点的全局rank号。 - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。 diff --git a/docs/api/paddle/distributed/isend_cn.rst b/docs/api/paddle/distributed/isend_cn.rst index daf71ef5246..3d63fbbd3b7 100644 --- a/docs/api/paddle/distributed/isend_cn.rst +++ b/docs/api/paddle/distributed/isend_cn.rst @@ -10,7 +10,7 @@ isend 参数 ::::::::: - tensor (Tensor) - 要发送的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 - - dst (int) - 目的的rank号。。 + - dst (int) - 目标节点的全局rank号。 - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。