修改了 trainer 和 evaluator 的 torch_kwargs

This commit is contained in:
YWMditto 2022-05-09 14:35:11 +08:00
parent 484b932772
commit bcc66b43cf
7 changed files with 146 additions and 57 deletions

View File

@ -51,23 +51,25 @@ class Evaluator:
False那么我们会将 batch 直接透传给 forward 函数注意上述逻辑同样应用于 `train_step`, `evaluate_step` `test_step`
:param fp16: 是否使用 fp16
:param verbose: 是否打印 evaluate 的结果
:param \**kwargs:
See below
:kwargs:
* *model_use_eval_mode* (``bool``) --
是否在 evaluate 的时候将 model 的状态设置成 eval 状态 eval 状态下model
dropout batch normalization 将会关闭默认为True如果为 FalsefastNLP 不会对 model evaluate 状态做任何设置无论
该值是什么fastNLP 都会在 evaluate 接受后将 model 的状态设置为 train
* *use_dist_sampler* --
是否使用分布式evaluate的方式仅当 driver 为分布式类型时该参数才有效默认为根据 driver 是否支持
分布式进行设置如果为True将使得每个进程上的 dataloader 自动使用不同数据所有进程的数据并集是整个数据集
* *output_from_new_proc* --
应当为一个字符串表示在多进程的 driver 中其它进程的输出流应当被做如何处理其值应当为以下之一
["all", "ignore", "only_error"]当该参数的值不是以上值时该值应当表示一个文件夹的名字我们会将其他 rank 的输出流重定向到
log 文件中然后将 log 文件保存在通过该参数值设定的文件夹中默认为 "only_error"
* *progress_bar* --
evaluate 的时候显示的 progress bar 目前支持三种 [None, 'raw', 'rich', 'auto'], auto 表示如果检测
到当前terminal为交互型则使用 rich否则使用 raw
* *torch_kwargs* -- 用于在指定 ``driver`` 'torch' 时设定具体 driver 实例的一些参数
* ddp_kwargs -- 用于在使用 ``TorchDDPDriver`` 时指定 ``DistributedDataParallel`` 初始化时的参数例如传入
{'find_unused_parameters': True} 来解决有有参数不参与前向运算导致的报错等
* torch_non_blocking -- 表示用于 pytorch tensor to 方法的参数 non_blocking
* *model_use_eval_mode* (``bool``) --
是否在 evaluate 的时候将 model 的状态设置成 eval 状态 eval 状态下model
dropout batch normalization 将会关闭默认为True如果为 FalsefastNLP 不会对 model evaluate 状态做任何设置无论
该值是什么fastNLP 都会在 evaluate 接受后将 model 的状态设置为 train
* *use_dist_sampler* --
是否使用分布式evaluate的方式仅当 driver 为分布式类型时该参数才有效默认为根据 driver 是否支持
分布式进行设置如果为True将使得每个进程上的 dataloader 自动使用不同数据所有进程的数据并集是整个数据集
* *output_from_new_proc* --
应当为一个字符串表示在多进程的 driver 中其它进程的输出流应当被做如何处理其值应当为以下之一
["all", "ignore", "only_error"]当该参数的值不是以上值时该值应当表示一个文件夹的名字我们会将其他 rank 的输出流重定向到
log 文件中然后将 log 文件保存在通过该参数值设定的文件夹中默认为 "only_error"
* *progress_bar* --
evaluate 的时候显示的 progress bar 目前支持三种 [None, 'raw', 'rich', 'auto'], auto 表示如果检测
到当前terminal为交互型则使用 rich否则使用 raw
"""
self.model = model

View File

@ -67,20 +67,28 @@ class Trainer(TrainerEventTrigger):
要自己实现模型部分而将训练层面的逻辑完全地交给 fastNLP
:param model: 训练所需要的模型目前支持 pytorch
:param driver: 训练模型所使用的具体的驱动模式应当为以下选择中的一个["torch", "torch_ddp", ]之后我们会加入 jittorpaddle
国产框架的训练模式其中 "torch" 表示使用 cpu 或者单张 gpu 进行训练
:param driver: 训练模型所使用的具体的驱动模式应当为以下选择中的一个["torch",]之后我们会加入 jittorpaddle
国产框架的训练模式其中 "torch" 表示使用 ``TorchSingleDriver`` 或者 ``TorchDDPDriver``具体使用哪一种取决于参数 ``device``
的设置
:param train_dataloader: 训练数据集注意其必须是单独的一个数据集不能是 List 或者 Dict
:param optimizers: 训练所需要的优化器可以是单独的一个优化器实例也可以是多个优化器组成的 List
:param device: 该参数用来指定具体训练时使用的机器注意当该参数为 None fastNLP 不会将模型和数据进行设备之间的移动处理但是你
可以通过参数 `input_mapping` `output_mapping` 来实现设备之间数据迁移的工作通过这两个参数传入两个处理数据的函数同时你也
可以通过在 kwargs 添加参数 "data_device" 来让我们帮助您将数据迁移到指定的机器上注意这种情况理应只出现在用户在 Trainer 实例化前
自己构造 DDP 的多进程场景
device 的可选输入如下所示
1. 可选输入str: ['cpu', 'cuda', 'cuda:0', 'cuda:1', ...] 依次为'cpu', 可见的第一个GPU中, 可见的第一个GPU中, 可见的第二个GPU中
2. torch.device将模型装载到torch.device上
3. int 将使用device_id为该值的gpu进行训练如果值为 -1那么默认使用全部的显卡此时是 `TorchDDPDriver`
4. list(int)如果多于1个device应当通过该种方式进行设定 `device` 为一个 list 我们默认使用 `TorchDDPDriver`
5. None 为None则不对模型进行任何处理
:param device: 该参数用来指定具体训练时使用的机器注意当该参数仅当您通过 `torch.distributed.launch/run` 启动时可以为 None
此时 fastNLP 不会对模型和数据进行设备之间的移动处理但是你可以通过参数 `input_mapping` `output_mapping` 来实现设备之间
数据迁移的工作通过这两个参数传入两个处理数据的函数同时你也可以通过在 kwargs 添加参数 "data_device" 来让我们帮助您将数据
迁移到指定的机器上注意这种情况理应只出现在用户在 Trainer 实例化前自己构造 DDP 的场景
device 的可选输入如下所示
* *str*: 例如 'cpu', 'cuda', 'cuda:0', 'cuda:1'
* *torch.device*: 将模型装载到 ``torch.device``
* *int*: 将使用 ``device_id`` 为该值的 ``gpu`` 进行训练如果值为 -1那么默认使用全部的显卡此时使用的 driver 实例是 `TorchDDPDriver`
* *list(int)*: 如果多于 1 个device应当通过该种方式进行设定注意此时我们一定会使用 ``TorchDDPDriver``不管您传入的列表的长度是 1 还是其它值
* *None*: 为None则不对模型进行任何处理
.. node::
如果希望使用 ``TorchDDPDriver``
:param n_epochs: 训练总共的 epoch 的数量默认为 20
:param evaluate_dataloaders: 验证数据集其可以是单独的一个数据集也可以是多个数据集当为多个数据集时注意其必须是 Dict默认
@ -124,12 +132,13 @@ class Trainer(TrainerEventTrigger):
:param marker: 用于标记一个 Trainer 实例从而在用户调用 `Trainer.on` 函数时标记该 callback 函数属于哪一个具体的 'trainer' 实例默认为 None
:param kwargs: 一些其它的可能需要的参数见下方的说明
:kwargs:
* *torch_non_blocking* -- 表示用于 pytorch tensor to 方法的参数 non_blocking
* *torch_kwargs* -- 用于在指定 ``driver`` 'torch' 时设定具体 driver 实例的一些参数
* ddp_kwargs -- 用于在使用 ``TorchDDPDriver`` 时指定 ``DistributedDataParallel`` 初始化时的参数例如传入
{'find_unused_parameters': True} 来解决有有参数不参与前向运算导致的报错等
* set_grad_to_none -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None
* torch_non_blocking -- 表示用于 pytorch tensor to 方法的参数 non_blocking
* *data_device* -- 表示如果用户的模型 device Driver 中对应为参数 model_device None 我们会将数据迁移到 data_device
注意如果 model_device None那么 data_device 不会起作用
* *torch_ddp_kwargs* -- 用于配置 pytorch DistributedDataParallel 初始化时的参数仅用于 pytorch ddp 训练例如传入
{'find_unused_parameters': True} 来解决有有参数不参与前向运算导致的报错等
* *set_grad_to_none* -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None
注意如果 model_device None那么 data_device 不会起作用
* *use_dist_sampler* -- 表示是否使用分布式的 sampler 在多卡时分布式 sampler 将自动决定每张卡上读取的 sample 使得一个epoch
内所有卡的 sample 加起来为一整个数据集的 sample默认会根据 driver 是否为分布式进行设置
* *evaluate_use_dist_sampler* -- 表示在 Evaluator 中在使用 分布式 的时候是否将 dataloader sampler 替换为分布式的 sampler默认为 True
@ -143,6 +152,8 @@ class Trainer(TrainerEventTrigger):
* *train_output_mapping* -- output_mapping 一致但是只用于 train output_mapping 互斥
* *evaluate_input_mapping* -- input_mapping 一致但是只用于 evaluate input_mapping 互斥
* *evaluate_output_mapping* -- output_mapping 一致但是只用于 evaluate output_mapping 互斥
"""
self.model = model
self.marker = marker
@ -279,7 +290,8 @@ class Trainer(TrainerEventTrigger):
self.dataloader = self.driver.set_dist_repro_dataloader(dataloader=self.train_dataloader, dist=_dist_sampler,
reproducible=self.callback_manager._need_reproducible_sampler)
self.set_grad_to_none = kwargs.get("set_grad_to_none", True)
_torch_kwargs = kwargs.get("torch_kwargs", {})
self.set_grad_to_none = _torch_kwargs.get("set_grad_to_none", True)
self.evaluate_batch_step_fn = evaluate_batch_step_fn
self.kwargs = kwargs

View File

@ -17,7 +17,7 @@ def choose_driver(model, driver: Union[str, Driver], device: Optional[Union[int,
if isinstance(driver, Driver):
return driver
if driver in {"torch", "torch_ddp", "fairscale"}:
if driver in {"torch", "fairscale"}:
from fastNLP.core.drivers.torch_driver.initialize_torch_driver import initialize_torch_driver
return initialize_torch_driver(driver, device, model, **kwargs)
elif driver in {"jittor"}:
@ -27,5 +27,5 @@ def choose_driver(model, driver: Union[str, Driver], device: Optional[Union[int,
from fastNLP.core.drivers.paddle_driver.initialize_paddle_driver import initialize_paddle_driver
return initialize_paddle_driver(driver, device, model, **kwargs)
else:
raise ValueError("Parameter `driver` can only be one of these values: ['torch', 'torch_ddp', 'fairscale', "
raise ValueError("Parameter `driver` can only be one of these values: ['torch', 'fairscale', "
"'jittor', 'paddle', 'fleet'].")

View File

@ -220,7 +220,7 @@ class TorchDDPDriver(TorchDriver):
self.world_size = None # int(os.environ.get("WORLD_SIZE")) len(self.parallel_device)
self.global_rank = 0
self._ddp_kwargs = kwargs.get("torch_ddp_kwargs", {})
self._ddp_kwargs = self._torch_kwargs.get("ddp_kwargs", {})
check_user_specific_params(self._ddp_kwargs, DistributedDataParallel.__init__)
if len(self.model._buffers) != 0 and self._ddp_kwargs.get("broadcast_buffers", None) is None:
logger.info("Notice your model has buffers and you are using `TorchDDPDriver`, but you do not set "

View File

@ -32,7 +32,7 @@ def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.devi
"`os.environ['LOCAL_RANK']`.")
return TorchDDPDriver(model, torch.device(f"cuda:{os.environ['LOCAL_RANK']}"), True, **kwargs)
if driver not in {"torch", "torch_ddp", "fairscale"}:
if driver not in {"torch", "fairscale"}:
raise ValueError("Parameter `driver` can only be one of these values: ['torch', 'torch_ddp', 'fairscale'].")
_could_use_device_num = torch.cuda.device_count()
@ -64,19 +64,6 @@ def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.devi
if driver == "torch":
if not isinstance(device, List):
return TorchSingleDriver(model, device, **kwargs)
else:
logger.info("Notice you are using `torch` driver but your chosen `device` are multi gpus, we will use "
"`TorchDDPDriver` by default. But if you mean using `TorchDDPDriver`, you should choose parameter"
"`driver` as `TorchDDPDriver`.")
return TorchDDPDriver(model, device, **kwargs)
elif driver == "torch_ddp":
if device is not None and not isinstance(device, List):
if device.type == 'cpu':
raise ValueError("You are using `torch_ddp` driver, but your chosen `device` is 'cpu'.")
logger.info("Notice you are using `torch_ddp` driver, but your chosen `device` is only one gpu, we will "
"still use `TorchDDPDriver` for you, but if you mean using `torch_ddp`, you should "
"choose `torch` driver.")
return TorchDDPDriver(model, [device], **kwargs)
else:
return TorchDDPDriver(model, device, **kwargs)
elif driver == "fairscale":

View File

@ -49,8 +49,9 @@ class TorchDriver(Driver):
self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16)
self.grad_scaler = _grad_scaler()
self._torch_kwargs = kwargs.get("torch_kwargs", {})
# 用来设置 `torch_move_data_to_device` 中的 `non_blocking` 参数;
self.non_blocking = kwargs.get("torch_non_blocking", True)
self.non_blocking = self._torch_kwargs.get("torch_non_blocking", True)
# 用来设置是否关闭 auto_param_call 中的参数匹配问题;
self.wo_auto_param_call = kwargs.get("model_wo_auto_param_call", False)

View File

@ -13,11 +13,13 @@ from tests.helpers.datasets.torch_data import TorchNormalDataset_Classification,
from tests.helpers.callbacks.helper_callbacks import RecordLossCallback, RecordMetricCallback
from tests.helpers.utils import magic_argv_env_context
from fastNLP.envs.imports import _NEED_IMPORT_TORCH
if _NEED_IMPORT_TORCH:
from torch.optim import SGD
from torch.utils.data import DataLoader
import torch.distributed as dist
@dataclass
class NormalClassificationTrainTorchConfig:
num_labels: int = 2
@ -101,7 +103,8 @@ def model_and_optimizers(request):
# 测试一下普通的情况;
@pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", 1), ("torch", [0, 1])]) # ("torch", "cpu"), ("torch", 1), ("torch", [0, 1])
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", 1),
("torch", [0, 1])]) # ("torch", "cpu"), ("torch", 1), ("torch", [0, 1])
@pytest.mark.parametrize("evaluate_every", [-3, -1, 100])
@magic_argv_env_context
def test_trainer_torch_with_evaluator(
@ -173,6 +176,7 @@ def test_trainer_torch_with_evaluator_fp16_accumulation_steps(
if dist.is_initialized():
dist.destroy_process_group()
@pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", 'cpu')]) # ("torch", [0, 1]),("torch", 1)
@magic_argv_env_context
@ -182,7 +186,6 @@ def test_trainer_validate_every(
device,
n_epochs=6,
):
def validate_every(trainer):
if trainer.global_forward_batches % 10 == 0:
print("\nfastNLP test validate every.\n")
@ -234,7 +237,7 @@ def test_trainer_on(
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders={"dl":model_and_optimizers.evaluate_dataloaders},
evaluate_dataloaders={"dl": model_and_optimizers.evaluate_dataloaders},
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
@ -243,10 +246,94 @@ def test_trainer_on(
evaluate_every=-1
)
trainer.run()
@pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", 'cpu'), ("torch", 0)]) # ("torch", [0, 1]),("torch", 1)
@magic_argv_env_context
def test_trainer_specific_params_1(
model_and_optimizers: TrainerParameters,
driver,
device,
n_epochs=2,
):
"""
测试一些特殊的参数是否能够正确地传递
"""
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders={"dl": model_and_optimizers.evaluate_dataloaders},
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
output_from_new_proc="all",
evaluate_every=-1,
model_wo_auto_param_call=True,
torch_kwargs={
"torch_non_blocking": False,
"set_grad_to_none": True
}
)
assert trainer.set_grad_to_none is True
assert trainer.driver.non_blocking is False
assert trainer.driver.wo_auto_param_call is True
@pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", [0, 1])]) # ("torch", [0, 1]),("torch", 1)
@magic_argv_env_context
def test_trainer_specific_params_2(
model_and_optimizers: TrainerParameters,
driver,
device,
n_epochs=2,
):
"""
测试一些特殊的参数是否能够正确地传递
"""
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders={"dl": model_and_optimizers.evaluate_dataloaders},
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
output_from_new_proc="all",
evaluate_every=-1,
model_wo_auto_param_call=True,
torch_kwargs={
"ddp_kwargs": {
"broadcast_buffers": True,
"find_unused_parameters": True
},
"torch_non_blocking": False,
"set_grad_to_none": True
}
)
assert trainer.set_grad_to_none is True
assert trainer.driver.non_blocking is False
assert trainer.driver.wo_auto_param_call is True
assert trainer.driver.output_from_new_proc == "all"
_ddp_kwargs = trainer.driver._ddp_kwargs
assert _ddp_kwargs.get("broadcast_buffers") is True
assert _ddp_kwargs.get("find_unused_parameters") is True