ddp 加入了更为详细的注释;修复了一个错误

This commit is contained in:
YWMditto 2022-04-09 13:59:32 +08:00
parent 4781178a5a
commit d03bc5ce14
2 changed files with 130 additions and 36 deletions

View File

@ -7,7 +7,6 @@ from time import sleep
from typing import List, Optional, Union, Dict
from functools import partial
# todo 这个等大家的 __all__ 都弄完后改为 from fastNLP.env import
from fastNLP.envs.imports import _NEED_IMPORT_TORCH
if _NEED_IMPORT_TORCH:
import torch
@ -44,20 +43,128 @@ class TorchDDPDriver(TorchDriver):
fp16: bool = False,
**kwargs
):
"""
DDP 目前考虑支持的三种启动方式
1. 用户自己不进行 ddp 的任何操作直接使用我们的 Trainer并且只运行一个 main 脚本这时是由我们自己使用 open_subprocesses 拉起
多个进程然后 TorchDDPDriver 自己 init_process_group
2. 其它情况同 1但是用户自己使用 python -m torch.distributed.launch 拉起
3. 用户自己在外面初始化 DDP并且通过 python -m torch.distributed.launch 拉起
r"""
`TorchDDPDriver` 目前支持的三种启动方式
1. 用户自己不进行 ddp 的任何操作直接使用我们的 Trainer这时是由我们自己使用 `open_subprocesses` 拉起多个进程
然后 `TorchDDPDriver` 自己通过调用 `dist.init_process_group` 来初始化 ddp 的通信组情况 A
2. 用户同样不在 Trainer 之外初始化 ddp但是用户自己使用 python -m torch.distributed.launch 拉起来创建多个进程这时我们仍旧
会通过调用 `dist.init_process_group` 来初始化 ddp 的通信组情况 B
3. 用户自己在外面初始化 DDP并且通过 python -m torch.distributed.launch 拉起这时无论是多个进程的拉起和 ddp 的通信组的建立
都由用户自己操作我们只会在 driver.setup 的时候对 `TorchDDPDriver` 设置一些必要的属性值情况 C
注意多机的启动强制要求用户在每一台机器上使用 python -m torch.distributed.launch 启动
注意多机的启动强制要求用户在每一台机器上使用 python -m torch.distributed.launch 启动因此我们不会在 `TorchDDPDriver` 中保存
任何当前有多少台机器的信息num_nodes不是 gpu 的数量
如果用户自己在外面初始化了 ddp那么
parallel_device None
data_device 表示单卡的一个参数
dist.is_initialized true
Part 1三种启动方式的具体分析
1对于用户运行的脚本中如果 `driver.setup` 只会被调用一次意味着用户的启动脚本中只初始化了一个 trainer/evaluator
`TorchDDPDriver` 在初始化以及 `setup` 函数中会做的事情分别如下所示
-> 情况 A这种情况下用户传入的 model 在一定是普通的 model没有经 `DistributedDataParallel` 包裹的model
因为 `DistributedDataParallel` 的使用一定要求 init_process_group 已经被调用用来建立当前的 ddp 通信组但是这意味着如果
用户需要使用 2 张以上的显卡那么其必然需要使用 torch.distributed.launch 来启动意味着就不是情况 A
这时我们首先会调用 `TorchDDPDriver.open_subprocess` 函数来拉起多个进程其中进程的数量等于用户传入给 trainer 的使用的 gpu
的数量例如 `Trainer` 中的参数是 device=[0, 1, 6, 7]那么我们就会使用第 0167 gpu 来拉起 4 个进程
接着我们会调用 `dist.init_process_group` 来初始化各个进程之间的通信组
这里需要注意拉起的新的进程会从前到后完整地运行一遍用户的启动脚本例如 main.py因此也都会运行这两个函数但是需要注意只有进程 0
才会去真正地运行 `TorchDDPDriver.open_subprocess`进程 0 运行到 `dist.init_process_group`pytorch 会阻塞进程 0 继续
向前运行直到其它进程也运行到这里
最后我们会设置这个进程对应的 device然后将模型迁移到对应的机器上再使用 `DistributedDataParallel` 将模型包裹
至此ddp 的环境配置过程全部完成
-> 情况 B注意这种情况我们直接限定了用户是通过 torch.distributed.launch 拉起并且没有自己建立 ddp 的通信组这时在
`TorchDDPDriver` 的初始化和 setup 函数的调用过程中与情况 A 首要的不同就在于用户在 trainer 中输入的参数 device 不再有效
这时每个进程所使用的 gpu 是我们直接通过 `torch.device("cuda:{local_rank}")` 来配置的因此如果用户想要实现使用特定 gpu
设备的目的可以通过自己设置环境变量实现例如 os.environ["CUDA_VISIBLE_DEVICE"] 来实现剩下的操作和情况 A 类似
-> 情况 C注意这种情况我们限定了用户是通过 torch.distributed.launch 拉起并且 ddp 的通信组也是由自己建立这时基本上所有的
与操作相关的操作都应当由用户自己完成包括迁移模型到对应 gpu 上以及将模型用 `DistributedDataParallel` 包裹等
2如果 `driver.setup` 函数在脚本中会被调用两次及以上意味着用户的启动脚本初始化了两个及以上的 trainer/evaluator
注意这种情况下我们是会保证前后两个 trainer/evaluator 使用的 `TorchDDPDriver` 以及其初始化方式的一致性换句话说如果 trainer1
检测到的启动方式是 '情况 A'那么我们会保证 trainer2 检测到的启动方式同样是 '情况A'即使这需要一些额外的处理因此这里我们主要讨论
我们是通过怎样的操作来保证 trainer2/3/... 检测到的启动方式是和 trainer1 一致的简单来说我们是通过使用环境变量来标记每一种不同的
启动方式来实现这一点的
我们会使用 `FASTNLP_DISTRIBUTED_CHECK` 来标记 '情况 A'使用 `fastnlp_torch_launch_not_ddp` 来标记 '情况 B'意味着我们在
使用 '情况 A' 来启动 `TorchDDPDriver` 我们会将 `FASTNLP_DISTRIBUTED_CHECK` 这一字符串注入到环境变量中 '情况 B' 时则
会将 `fastnlp_torch_launch_not_ddp` 这一字符串注入到环境变量中因此在 trainer2 `TorchDDPDriver` 的初始化和 setup 过程中
如果检测到这些特殊的环境变量我们就会将启动方式变更为其对应的启动方式即使其它的参数特征属于另外的启动方式
Part 2对应的代码细节
1. 如何判断当前的各进程之间的通信组已经被建立ddp 已经被初始化
dist.is_initialized()
2. 如何判断不同的进程是否是由 `python -m torch.distributed.launch` 拉起还是由我们的 `TorchDDPDriver.open_subprocess`
函数拉起
我们会在用户脚本 `import fastNLP` 的时候检测当前的环境变量中是否有 'LOCAL_RANK''WORLD_SIZE' 以及没有 `FASTNLP_DISTRIBUTED_CHECK`
如果满足条件则我们会向环境变量中注入特殊的值 'FASTNLP_BACKEND_LAUNCH' 来标记用户是否使用了 `python -m torch.distributed.launch`
来拉起多个进程
3. 整体的处理判断流程
___________________________________
进入 TorchDDPDriver __init__ 函数
___________________________________________________
判断不同的进程是否是由 torch.distributed.launch 拉起
或者我们自己的 open_subprocess 函数拉起 -------------->
 
是由 torch.distributed.launch 拉起 我们自己的 open_subprocess 函数拉起多个进程
 ___________________________             
检测用户是否自己初始化了 ddp              
                   
________
______ 情况 A
|情况 C|
______
-----------> |情况 B|
  
4. 为了完成全部的建立 ddp 所需要的操作三种情况都需要做的事情以及每件事情的职责归属
情况 A 情况 B 情况 C
________________________________________________________________________________________________________
配置 ddp TorchDDPDriver.open_subprocess torch.distributed.launch torch.distributed.launch
需要的环境变量
开启多个进程 TorchDDPDriver.open_subprocess torch.distributed.launch torch.distributed.launch
调用 dist.
init_process\ TorchDDPDriver.setup TorchDDPDriver.setup 用户自己调用
_group 函数
设置 TorchDDPDriver
world_size TorchDDPDriver.setup TorchDDPDriver.setup TorchDDPDriver.setup
global_rank 属性
Part 3其它的处理细节
1. 环境变量
fastNLP `TorchDDPDriver` 运行时所需要的环境变量分为两种一种是 torch ddp 运行所需要的环境变量另一种是 fastNLP 自己
的环境变量前者的配置情况如上表所示而后者中的大多数环境变量则是在用户 import fastNLP 时就设置好了
2. parallel_device, model_device data_device 的关系
parallel_device `TorchDDPDriver` 的参数model_device data_device 都为 driver 的属性
其中 data_device 仅当情况 C 时由用户自己指定如果其不为 None那么在模型 forward 的时候我们就会将数据迁移到 data_device
model_device 永远都为单独的一个 torch.device
情况 A 情况 B 情况 C
________________________________________________________________________________________________________
parallel_device 由用户传入trainer的参数 torch.device( torch.device(
device 决定必须是一个list "cuda:{local_rank}") "cuda:{local_rank}")
其中每一个对象都是 torch.device
model_device parallel_device[local_rank] parallel_device None
data_device model_device model_device 由用户传入 trainer 的参数
data_device 决定
3. _DDPWrappingModel 的作用
因为我们即需要调用模型的 `train_step``validate_step``test_step` 方法又需要通过 `DistributedDataParallel`
forward 函数来帮助我们同步各个设备上的梯度因此我们需要先将模型单独包裹一层然后在 forward 的时候其先经过 `DistributedDataParallel`
forward 方法然后再经过 `_DDPWrappingModel` forward 方法我们会在该 forward 函数中进行判断确定调用的是模型自己的
forward 函数还是 `train_step``validate_step``test_step` 方法
4. 当某一个进程出现 exception `TorchDDPDriver` 的处理
不管是什么情况`TorchDDPDriver` `setup` 函数的最后都会将所有进程的 pid 主动记录下来这样当一个进程出现 exception
driver on_exception 函数就会被 trainer 调用其会调用 os.kill 指令将其它进程 kill
"""
super(TorchDDPDriver, self).__init__(model, fp16=fp16, **kwargs)
@ -81,7 +188,8 @@ class TorchDDPDriver(TorchDriver):
# 如果用户自己在外面初始化了 DDP
self.outside_ddp = False
if dist.is_initialized() and FASTNLP_DISTRIBUTED_CHECK not in os.environ and "fastnlp_special" not in os.environ:
if dist.is_initialized() and FASTNLP_DISTRIBUTED_CHECK not in os.environ and \
"fastnlp_torch_launch_not_ddp" not in os.environ:
# 如果用户自己在外面初始化了 DDP那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型;
if not isinstance(model, DistributedDataParallel):
raise RuntimeError(
@ -97,7 +205,7 @@ class TorchDDPDriver(TorchDriver):
if isinstance(batch, Dict):
return auto_param_call(step_fn, batch, signature_fn=signature_fn)
else:
return self._validate_step(batch)
return step_fn(batch)
model = model.module
if hasattr(model, "train_step"):
@ -185,7 +293,7 @@ class TorchDDPDriver(TorchDriver):
backend="nccl", rank=self.global_rank, world_size=self.world_size
)
os.environ["fastnlp_special"] = "yes"
os.environ["fastnlp_torch_launch_not_ddp"] = "yes"
# 进入到这里的情况时:
# dist.is_initialized 一定为 False

View File

@ -34,20 +34,13 @@ def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) ->
def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int:
"""Function that sets seed for pseudo-random number generators in: pytorch, numpy, python.random In addition,
sets the following environment variables:
r"""
为伪随机数生成器设置种子的函数pytorchnumpypython.random 另外
设置以下环境变量
- `PL_GLOBAL_SEED`: will be passed to spawned subprocesses (e.g. ddp_spawn backend).
- `PL_SEED_WORKERS`: (optional) is set to 1 if ``workers=True``.
Args:
seed: the integer value seed for global random state in Lightning.
If `None`, will read seed from `PL_GLOBAL_SEED` env variable
or select it randomly.
workers: if set to ``True``, will properly configure all dataloaders passed to the
Trainer with a ``worker_init_fn``. If the user already provides such a function
for their dataloaders, setting this argument will have no influence. See also:
:func:`~pytorch_lightning.utilities.seed.pl_worker_init_function`.
:param seed: 全局随机状态的整数值种子如果为将从 "FASTNLP_GLOBAL_SEED" 环境变量中读取种子或随机选择
:param workers: 如果设置为True将正确配置所有传递给带有worker_init_fn的培训师如果用户已经提供了这样的功能对于他们的数据加载器
设置此参数将没有影响;
"""
max_seed_value = np.iinfo(np.uint32).max
min_seed_value = np.iinfo(np.uint32).min
@ -56,7 +49,6 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) ->
env_seed = os.environ.get(FASTNLP_GLOBAL_SEED)
if env_seed is None:
seed = _select_seed_randomly(min_seed_value, max_seed_value)
# rank_zero_warn(f"No seed found, seed set to {seed}")
else:
try:
seed = int(env_seed)
@ -69,12 +61,8 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) ->
if not (min_seed_value <= seed <= max_seed_value):
logger.warning("Your seed value is two big or two small for numpy, we will choose a random seed for you.")
# rank_zero_warn(f"{seed} is not in bounds, numpy accepts from {min_seed_value} to {max_seed_value}")
seed = _select_seed_randomly(min_seed_value, max_seed_value)
# using `log.info` instead of `rank_zero_info`,
# so users can verify the seed is properly set in distributed training.
# log.info(f"Global seed set to {seed}")
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
@ -84,11 +72,9 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) ->
def reset_seed() -> None:
"""
r"""
这个函数主要是给 ddp 用的因为 ddp 会开启多个进程因此当用户在脚本中指定 seed_everything 在开启多个脚本后会在每个脚本内重新
进行随机数的设置
If :func:`pytorch_lightning.utilities.seed.seed_everything` is unused, this function will do nothing.
"""
seed = os.environ.get(FASTNLP_GLOBAL_SEED, None)
workers = os.environ.get(FASTNLP_SEED_WORKERS, "0")