1. 继续完善部分文档;2.删除 paddle 多卡下的 data_device 功能 3. 将 paddle_utils 下的 get_device_from_visible 函数更名为 _convert_data_device 并进行修改

This commit is contained in:
x54-729 2022-05-10 10:58:40 +00:00
parent 9a15af88d7
commit d79de6b008
22 changed files with 222 additions and 187 deletions

View File

@ -304,8 +304,7 @@ class Trainer(TrainerEventTrigger):
1. driver 实例的 ``model_device`` 不为 None 该参数无效
2. 对于 pytorch仅当用户自己通过 ``python -m torch.distributed.launch`` 并且自己初始化 ``init_process_group``
driver 实例的 ``model_device`` 才会为 None
3. 对于 paddle仅当用户自己通过 ``python -m paddle.distributed.launch`` 并且自己初始化 :func:`~init_parallel_env`
:meth:`fleet.init` driver 实例的 ``model_device`` 才会为 None
3. 对于 paddle该参数无效
* *use_dist_sampler* -- 表示是否使用分布式的 ``sampler``在多卡时分布式 ``sampler`` 将自动决定每张卡上读取的 sample 使得一个 epoch
内所有卡的 sample 加起来为一整个数据集的 sample默认会根据 driver 是否为分布式进行设置

View File

@ -164,7 +164,7 @@ class PaddleDataLoader(DataLoader):
"""
获取当前 ``batch`` 中每条数据对应的索引
:return: 当前 ``batch`` 数据的索引
:return: 当前 ``batch`` 数据的索引
"""
return self.cur_batch_indices

View File

@ -172,7 +172,7 @@ class TorchDataLoader(DataLoader):
"""
获取当前 ``batch`` 中每条数据对应的索引
:return: 当前 ``batch`` 数据的索引
:return: 当前 ``batch`` 数据的索引
"""
return self.cur_batch_indices

View File

@ -400,16 +400,22 @@ class DataSet:
new_field_name: str = None, num_proc: int = 0,
progress_desc: str = None, show_progress_bar: bool = True):
r"""
:class:`~DataSet` 每个 ``instance`` 中为 ``field_name`` ``field`` 传给函数 ``func``并获取函数的返回值
:class:`~DataSet` 每个 ``instance`` 中为 ``field_name`` ``field`` 传给函数 ``func``并写入到 ``new_field_name``
:param field_name: 传入 ``func`` ``field`` 名称
:param func: 一个函数其输入是 ``instance`` 中名为 ``field_name`` ``field`` 的内容
:param new_field_name: ``func`` 返回的内容放入到 ``new_field_name`` 对应的 ``field`` 如果名称与已有的 ``field`` 相同
则进行覆盖如果为 ``None`` 则不会覆盖和创建 ``field``
:param num_proc: 使用进程的数量请注意由于 ``python`` 语言的特性使用了多少进程就会导致多少倍内存的增长
:param progress_desc: 进度条的描述字符默认为 ``Main``
:param show_progress_bar: 是否展示进度条默认为展示
:return: 从函数 ``func`` 中得到的返回值
:param field_name: 传入 ``func`` ``field`` 名称
:param func: 对指定 ``field`` 进行处理的函数注意其输入应为 ``instance`` 中名为 ``field_name`` ``field`` 的内容
:param new_field_name: 函数执行结果写入的 ``field`` 名称该函数会将 ``func`` 返回的内容放入到 ``new_field_name``
应的 ``field`` 注意如果名称与已有的 ``field`` 相同则会进行覆盖如果为 ``None`` 则不会覆盖和创建 ``field``
:param num_proc: 使用进程的数量
.. note::
由于 ``python`` 语言的特性设置该参数后会导致相应倍数的内存增长这可能会对您程序的执行带来一定的影响
:param progress_desc: 进度条的描述字符默认为 ``Main``
:param show_progress_bar: 是否在处理过程中展示进度条
:return: 从函数 ``func`` 中得到的返回值
"""
assert len(self) != 0, "Null DataSet cannot use apply_field()."
if not self.has_field(field_name=field_name):

View File

@ -7,18 +7,22 @@ from fastNLP.envs.imports import _NEED_IMPORT_JITTOR
if _NEED_IMPORT_JITTOR:
import jittor
__all__ = []
def initialize_jittor_driver(driver: str, device: Union[str, int, List[int]], model: jittor.Module, **kwargs) -> JittorDriver:
r"""
用来根据参数 `driver` `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去
在这个函数中我们会根据用户设置的device来确定JittorDriver的mode
用来根据参数 ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例然后返回回去
:param driver: 该参数的值应为以下之一["jittor"]
:param device: jittor运行的设备
.. todo::
创建多卡的 driver
:param driver: 该参数的值应为以下之一``["jittor"]``
:param device: ``jittor`` 运行的设备
:param model: 训练或者评测的具体的模型
:param kwargs:
:return: 返回一个元组元组的第一个值是具体的基于 jittor `Driver` 实例元组的第二个值是该 driver 的名字用于检测一个脚本中
先后 driver 的次序的正确问题
:return: :class:`~fastNLP.core.JittorSingleDriver` :class:`~fastNLP.core.JittorMPIDriver` 实例
"""
if driver not in {"jittor"}:

View File

@ -24,7 +24,17 @@ if _NEED_IMPORT_JITTOR:
class JittorDriver(Driver):
r"""
Jittor 框架的 Driver
``Jittor`` 框架的 ``Driver``
.. note::
这是一个正在开发中的功能敬请期待
.. todo::
实现 fp16 的设置且支持 cpu gpu 的切换
实现用于断点重训的 save load 函数
"""
def __init__(self, model, fp16: bool = False, **kwargs):

View File

@ -13,6 +13,14 @@ __all__ = [
]
class JittorMPIDriver(JittorDriver):
"""
执行 ``Jittor`` 框架下分布式训练的 ``Driver``
.. note::
这是一个正在开发中的功能敬请期待
"""
def __init__(
self,
model,

View File

@ -16,8 +16,17 @@ __all__ = [
class JittorSingleDriver(JittorDriver):
r"""
用于 cpu 单卡 gpu 运算
TODO: jittor fp16
``Jittor`` 框架下用于 ``cpu`` 和单卡 ``gpu`` 运算的 ``Driver``
.. note::
这是一个正在开发中的功能敬请期待
.. todo::
支持 cpu gpu 的切换
实现断点重训中替换 dataloader set_dist_repro_dataloader 函数
"""
def __init__(self, model, device=None, fp16: bool = False, **kwargs):
@ -30,11 +39,6 @@ class JittorSingleDriver(JittorDriver):
self.world_size = 1
def step(self):
"""
jittor optimizers 的step函数可以传入参数loss
此时会同时进行 zero_grad backward
为了统一这里暂不使用这样的方式
"""
for optimizer in self.optimizers:
optimizer.step()

View File

@ -5,10 +5,11 @@ from fastNLP.envs.imports import _NEED_IMPORT_JITTOR
if _NEED_IMPORT_JITTOR:
import jittor
__all__ = []
class DummyGradScaler:
"""
用于仿造的GradScaler对象防止重复写大量的if判断
"""
def __init__(self, *args, **kwargs):
pass

View File

@ -1,8 +1,6 @@
import os
from typing import List, Union, Optional, Dict, Tuple, Callable
from fastNLP.core.utils.paddle_utils import get_device_from_visible
from .paddle_driver import PaddleDriver
from .fleet_launcher import FleetLauncher
from .utils import (
@ -21,6 +19,7 @@ from fastNLP.core.utils import (
is_in_paddle_dist,
get_paddle_device_id,
)
from fastNLP.core.utils.paddle_utils import _convert_data_device
from fastNLP.envs.distributed import rank_zero_rm
from fastNLP.core.samplers import (
ReproduceBatchSampler,
@ -221,25 +220,6 @@ class PaddleFleetDriver(PaddleDriver):
"you initialize the paddle distribued process out of our control.")
self.outside_fleet = True
# 用户只有将模型上传到对应机器上后才能用 DataParallel 包裹,因此如果用户在外面初始化了 Fleet那么在 PaddleFleetDriver 中
# 我们就直接将 model_device 置为 None
self._model_device = None
# 当参数 `device` 为 None 时并且该参数不为 None表示将对应的数据移到指定的机器上
self._data_device = kwargs.get("data_device", None)
if self._data_device is not None:
if isinstance(self._data_device, int):
if self._data_device < 0:
raise ValueError("Parameter `data_device` can not be smaller than 0.")
_could_use_device_num = paddle.device.cuda.device_count()
if self._data_device >= _could_use_device_num:
raise ValueError("The gpu device that parameter `device` specifies is not existed.")
self._data_device = f"gpu:{self._data_device}"
elif not isinstance(self._data_device, str):
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.")
# if self.outside_fleet and paddle.device.get_device() != self._data_device:
# logger.warning("`Parameter data_device` is not equal to paddle.deivce.get_device(), "
# "please keep them equal to avoid some potential bugs.")
self.world_size = None
self.global_rank = 0
@ -419,8 +399,6 @@ class PaddleFleetDriver(PaddleDriver):
@property
def data_device(self):
if self.outside_fleet:
return self._data_device
return self.model_device
def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict:
@ -574,7 +552,7 @@ class PaddleFleetDriver(PaddleDriver):
def broadcast_object(self, obj, src:int=0, group=None, **kwargs):
# 因为设置了CUDA_VISIBLE_DEVICES可能会引起错误
device = get_device_from_visible(self.data_device)
device = _convert_data_device(self.data_device)
return fastnlp_paddle_broadcast_object(obj, src, device=device, group=group)
def all_gather(self, obj, group=None) -> List:

View File

@ -14,20 +14,24 @@ from fastNLP.core.log import logger
if _NEED_IMPORT_PADDLE:
import paddle
__all__ = []
def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[int]]],
model: "paddle.nn.Layer", **kwargs) -> PaddleDriver:
r"""
用来根据参数 `driver` `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去
1如果检测到当前进程为用户通过 `python -m paddle.distributed.launch xxx.py` 方式拉起的则将
设备自动设置为用户指定的设备由于我们在引入 fastNLP 进行了特殊的设置因此可以通过 `CUDA_VISIBLE_DEVICES` 获取
2如果检测到输入的 `driver` `paddle` `device` 包含了多个设备那么我们会给出警告并且自动返回多卡的 Driver
3如果检测到输入的 `driver` `fleet` `device` 仅有一个设备那么我们会给出警告但仍旧返回多卡的 Driver
用来根据参数 ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例
:param driver: 使用的 ``driver`` 类型在这个函数中仅支持 ``paddle``
:param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致
1. 如果检测到当前进程为用户通过 ``python -m paddle.distributed.launch xxx.py`` 方式拉起的则将
设备自动设置为用户指定的设备由于我们要求分布式训练必须进行 ``backend`` 的设置因此可以通过 ``CUDA_VISIBLE_DEVICES`` 获取
2. 如果 ``device`` 包含了多个设备则返回一个 :class:`~fastNLP.core.PaddleFleetDriver` 实例否则返回
单卡的 :class:`~fastNLP.core.PaddleSingleDriver` 实例
:param driver: 使用的 ``driver`` 类型在这个函数中仅支持 ``paddle``
:param device: 该参数的格式与 ``Trainer`` 对参数 ``device`` 的要求一致
:param model: 训练或者评测的具体的模型
:return: 返回构造的 `Driver` 实例
:return: 一个 :class:`~fastNLP.core.PaddleSingleDriver` :class:`~fastNLP.core.PaddleFleetDriver` 实例
"""
if driver != "paddle":
raise ValueError("When initialize PaddleDriver, parameter `driver` must be 'paddle'.")

View File

@ -12,7 +12,8 @@ from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES
from .utils import _build_fp16_env, optimizer_state_to_device, DummyGradScaler
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.drivers.driver import Driver
from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device, get_device_from_visible
from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device
from fastNLP.core.utils.paddle_utils import _convert_data_device
from fastNLP.envs import (
FASTNLP_SEED_WORKERS,
FASTNLP_MODEL_FILENAME,
@ -371,10 +372,7 @@ class PaddleDriver(Driver):
:return: 将移动到指定机器上的 batch 对象返回
"""
if USER_CUDA_VISIBLE_DEVICES in os.environ:
device = get_device_from_visible(self.data_device)
else:
device = self.data_device
device = _convert_data_device(self.data_device)
return paddle_move_data_to_device(batch, device)
@staticmethod

View File

@ -8,10 +8,10 @@ from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES
from fastNLP.core.utils import (
auto_param_call,
get_device_from_visible,
get_paddle_gpu_str,
get_paddle_device_id,
)
from fastNLP.core.utils.paddle_utils import _convert_data_device
from fastNLP.core.utils.utils import _get_fun_msg
from fastNLP.core.samplers import (
ReproducibleBatchSampler,
@ -64,10 +64,7 @@ class PaddleSingleDriver(PaddleDriver):
r"""
该函数用来初始化训练环境用于设置当前训练的设备并将模型迁移到对应设备上
"""
if USER_CUDA_VISIBLE_DEVICES in os.environ:
device = get_device_from_visible(self.data_device)
else:
device = self.data_device
device = _convert_data_device(self.data_device)
paddle.device.set_device(device)
with contextlib.redirect_stdout(None):

View File

@ -10,19 +10,18 @@ from .ddp import TorchDDPDriver
from fastNLP.core.log import logger
from fastNLP.envs import FASTNLP_BACKEND_LAUNCH
__all__ = []
def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.device", int, List[int]]],
model: "torch.nn.Module", **kwargs) -> TorchDriver:
r"""
用来根据参数 `driver` `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去
注意如果输入的 `device` 如果和 `driver` 对应不上就直接报错
用来根据参数 ``driver` ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例然后返回回去
:param driver: 该参数的值应为以下之一["torch", "torch_ddp", "fairscale"]
:param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致
:param driver: 该参数的值应为以下之一``["torch", "fairscale"]``
:param device: 该参数的格式与 ``Trainer`` 对参数 ``device`` 的要求一致
:param model: 训练或者评测的具体的模型
:return: 返回一个元组元组的第一个值是具体的基于 pytorch `Driver` 实例元组的第二个值是该 driver 的名字用于检测一个脚本中
先后 driver 的次序的正确问题
:return: 返回一个 :class:`~fastNLP.core.TorchSingleDriver` :class:`~fastNLP.core.TorchDDPDriver` 实例
"""
# world_size 和 rank
if FASTNLP_BACKEND_LAUNCH in os.environ:

View File

@ -4,7 +4,7 @@ from typing import List, Any
import numpy as np
from fastNLP.core.metrics.backend import Backend
from fastNLP.core.utils.paddle_utils import paddle_to, get_device_from_visible
from fastNLP.core.utils.paddle_utils import paddle_to, _convert_data_device
from fastNLP.core.metrics.utils import AggregateMethodError
from fastNLP.core.drivers.paddle_driver.dist_utils import fastnlp_paddle_all_gather
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
@ -81,8 +81,7 @@ class PaddleBackend(Backend):
raise ValueError(f"tensor: {tensor} can not convert to ndarray!")
def move_tensor_to_device(self, tensor, device):
if USER_CUDA_VISIBLE_DEVICES in os.environ:
device = get_device_from_visible(device)
device = _convert_data_device(device)
return paddle_to(tensor, device)
def all_gather_object(self, obj, group=None) -> List:

View File

@ -2,7 +2,6 @@ __all__ = [
'cache_results',
'is_jittor_dataset',
'jittor_collate_wraps',
'get_device_from_visible',
'paddle_to',
'paddle_move_data_to_device',
'get_paddle_device_id',
@ -28,7 +27,7 @@ __all__ = [
from .cache_results import cache_results
from .jittor_utils import is_jittor_dataset, jittor_collate_wraps
from .paddle_utils import get_device_from_visible, paddle_to, paddle_move_data_to_device, get_paddle_device_id, get_paddle_gpu_str, is_in_paddle_dist, \
from .paddle_utils import paddle_to, paddle_move_data_to_device, get_paddle_device_id, get_paddle_gpu_str, is_in_paddle_dist, \
is_in_fnlp_paddle_dist, is_in_paddle_launch_dist
from .rich_progress import f_rich_progress
from .torch_utils import torch_move_data_to_device

View File

@ -15,6 +15,12 @@ from fastNLP.core.dataset import Instance
def is_jittor_dataset(dataset) -> bool:
"""
判断传入的 ``dataset`` 是否是 :class:`jittor.dataset.Dataset` 类型
:param dataset: 数据集
:return: 当前 ``dataset`` 是否为 ``jittor`` 的数据集类型
"""
try:
if isinstance(dataset, jt.dataset.Dataset):
return True
@ -26,7 +32,8 @@ def is_jittor_dataset(dataset) -> bool:
def jittor_collate_wraps(func, auto_collator: Callable):
"""
对jittor的collate_fn进行wrap封装, 如果数据集为mapping类型那么采用auto_collator否则还是采用jittor自带的collate_batch
``jittor`` ``collate_fn`` 进行 ``wrap`` 封装,如果数据集为 ``mapping`` 类型那么采用 ``auto_collator`` 否则
还是采用 ``jittor`` ``collate_batch``
:param func:
:param auto_collator:

View File

@ -1,5 +1,4 @@
__all__ = [
"get_device_from_visible",
"paddle_to",
"paddle_move_data_to_device",
"get_paddle_gpu_str",
@ -21,55 +20,71 @@ if _NEED_IMPORT_PADDLE:
from .utils import apply_to_collection
def get_device_from_visible(device: Union[str, int]) -> str:
def _convert_data_device(device: Union[str, int]) -> str:
"""
在有 ``CUDA_VISIBLE_DEVICES`` 的情况下获取对应的设备
CUDA_VISIBLE_DEVICES=2,3 device=3 则返回1
用于转换 ``driver`` ``data_device`` 的函数如果用户设置了 ``FASTNLP_BACKEND=paddle``那么 ``fastNLP`` 会将
可见的设备保存在 ``USER_CUDA_VISIBLE_DEVICES`` 并且将 ``CUDA_VISIBLE_DEVICES`` 设置为可见的第一张显卡这是为
了顺利执行 ``paddle`` 的分布式训练而设置的
:param device: 未转化的设备名
:return: 转化后的设备格式为 ``gpu:x``
在这种情况下单纯使用 ``driver.data_device`` 是无效的比如在分布式训练中将设备设置为 ``[0,2,3]`` 且用户设置了
``CUDA_VISIBLE_DEVICES=3,4,5,6`` 那么在 ``rank1``的进程中有::
os.environ["CUDA_VISIBLE_DEVICES"] = "5"
os.environ["USER_CUDA_VISIBLE_DEVICES"] = "3,4,5,6"
driver.data_device = "gpu:2" # 为了向用户正确地反映他们设置的设备减少歧义,因此这里没有设置为 "gpu:5"
此时我们便需要通过这个函数将 ``data_device`` 转换为 ``gpu:0``具体过程便是通过索引 **2** ``USER_CUDA_VISIBLE_DEVICES``
找到设备 **5**然后在 ``CUDA_VISIBLE_DEVICES`` 中找到设备 **5** 的索引 **0** 返回
.. note::
在分布式单进程仅支持单卡的情况下中这个函数实际等同于直接转换为 ``gpu:0`` 返回
:param device: 未转化的设备
:return: 转化后的设备格式为 ``gpu:x``
"""
if device == "cpu":
return device
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
try:
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES)
if cuda_visible_devices is not None:
idx = get_paddle_device_id(device)
if user_visible_devices is not None:
# 此时一定发生在分布式的情况下,利用 USER_CUDA_VISIBLDE_DEVICES 获取用户期望的设备
idx = user_visible_devices.split(",")[idx]
else:
idx = str(idx)
cuda_visible_devices_list = cuda_visible_devices.split(',')
if idx not in cuda_visible_devices_list:
raise ValueError(f"Can't find your devices {idx} in CUDA_VISIBLE_DEVICES[{cuda_visible_devices}]. ")
return f"gpu:{cuda_visible_devices_list.index(idx)}"
else:
if device == "cpu" or user_visible_devices is None:
# 传入的是 CPU或者没有设置 USER_CUDA_VISIBLE_DEVICES
# 此时不需要进行转换
return get_paddle_gpu_str(device)
def paddle_to(data, device: Union[str, int]):
"""
`data` 迁移到指定的 `device`
idx = get_paddle_device_id(device)
idx = user_visible_devices.split(",")[idx]
# 此时 CUDA_VISIBLE_DEVICES 一定不是 None
cuda_visible_devices_list = os.getenv("CUDA_VISIBLE_DEVICES").split(',')
return f"gpu:{cuda_visible_devices_list.index(idx)}"
except Exception as e:
raise ValueError(f"Can't convert device {device} when USER_CUDA_VISIBLE_DEVICES={user_visible_devices} "
"and CUDA_VISIBLE_DEVICES={cuda_visible_devices}. If this situation happens, please report this bug to us.")
:param data: 要迁移的张量
:param device: 目标设备可以是 `str` `int`
:return: 迁移后的张量
def paddle_to(data: "paddle.Tensor", device: Union[str, int]) -> "paddle.Tensor":
"""
``data`` 迁移到指定的 ``device`` ``paddle.Tensor`` 没有类似 ``torch.Tensor`` ``to`` 函数该函数
只是集成了 :func:`paddle.Tensor.cpu` :func:`paddle.Tensor.cuda` 两个函数
:param data: 要迁移的张量
:param device: 目标设备可以是 ``str`` ``int`` 类型
:return: 迁移后的张量
"""
if device == "cpu":
return data.cpu()
else:
# device = get_device_from_visible(device, output_type=int)
return data.cuda(get_paddle_device_id(device))
def get_paddle_gpu_str(device: Union[str, int]) -> str:
"""
获得 `gpu:x` 类型的设备名
获得 ``gpu:x`` 格式的设备名::
:param device: 设备编号或设备名
:return: 返回对应的 `gpu:x` 格式的设备名
>>> get_paddle_gpu_str(1)
'gpu:1'
>>> get_paddle_gpu_str("cuda:1")
'gpu:1'
:param device: 设备编号或设备名
:return: 返回对应的 ``gpu:x`` 格式的设备名
"""
if isinstance(device, str):
return device.replace("cuda", "gpu")
@ -78,10 +93,17 @@ def get_paddle_gpu_str(device: Union[str, int]) -> str:
def get_paddle_device_id(device: Union[str, int]) -> int:
"""
获得 gpu 的设备id
获得 ``device`` 的设备编号::
:param: device: 设备编号或设备名
:return: 设备对应的编号
>>> get_paddle_device_id("gpu:1")
1
>>> get_paddle_device_id("gpu")
0
请注意不要向这个函数中传入 ``cpu``
:param: device: 设备编号或设备名
:return: 设备对应的编号
"""
if isinstance(device, int):
return device
@ -103,20 +125,16 @@ def get_paddle_device_id(device: Union[str, int]) -> int:
return device_id
def paddle_move_data_to_device(batch: Any, device: Optional[str] = None,
data_device: Optional[str] = None) -> Any:
def paddle_move_data_to_device(batch: Any, device: Optional[Union[str, int]]) -> Any:
r"""
数据集合传输到给定设备只有paddle.Tensor对象会被传输到设备中其余保持不变
``paddle`` 数据集合传输到给定设备只有 :class:`paddle.Tensor` 对象会被传输到设备中其余保持不变
:param batch:
:param device: `cpu`, `gpu` or `gpu:x`
:param data_device:
:return: 相同的集合但所有包含的张量都驻留在新设备上
:param batch: 需要进行迁移的数据集合
:param device: 目标设备可以是显卡设备的编号或是``cpu``, ``gpu`` ``gpu:x`` 格式的字符串当这个参数
`None`` 不会执行任何操作
:return: 迁移到新设备上的数据集合
"""
if device is None:
if data_device is not None:
device = data_device
else:
return batch
def batch_to(data: Any) -> Any:
@ -125,22 +143,22 @@ def paddle_move_data_to_device(batch: Any, device: Optional[str] = None,
return apply_to_collection(batch, dtype=paddle.Tensor, function=batch_to)
def is_in_paddle_dist():
def is_in_paddle_dist() -> bool:
"""
判断是否处于分布式的进程下使用 global_rank selected_gpus 判断
判断是否处于 ``paddle`` 分布式的进程下使用 ``PADDLE_RANK_IN_NODE`` ``FLAGS_selected_gpus`` 判断
"""
return ('PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ)
def is_in_fnlp_paddle_dist():
def is_in_fnlp_paddle_dist() -> bool:
"""
判断是否处于 FastNLP 拉起的分布式进程中
判断是否处于 ``fastNLP`` 拉起的 ``paddle`` 分布式进程中
"""
return FASTNLP_DISTRIBUTED_CHECK in os.environ
def is_in_paddle_launch_dist():
def is_in_paddle_launch_dist() -> bool:
"""
判断是否处于 launch 启动的分布式进程中
判断是否处于 ``python -m paddle.distributed.launch`` 方法启动的 ``paddle`` 分布式进程中
"""
return FASTNLP_BACKEND_LAUNCH in os.environ

View File

@ -44,12 +44,12 @@ class TorchTransferableDataType(ABC):
def torch_move_data_to_device(batch: Any, device: Optional[Union[str, "torch.device"]] = None,
non_blocking: Optional[bool] = True) -> Any:
r"""
将数据集合传输到给定设备任何定义方法 to(device) 的对象都将被移动并且集合中的所有其他对象将保持不变
``pytorch`` 将数据集合 ``batch`` 传输到给定设备任何定义方法 ``to(device)`` 的对象都将被移动并且集合中的所有其他对象将保持不变
:param batch: 应当迁移的数据
:param device: 数据应当迁移到的设备当该参数的值为 None 表示迁移数据的操作由用户自己完成我们不需要经管
:param non_blocking: pytorch 迁移数据方法 `to` 的参数
:return: 相同的集合但所有包含的张量都驻留在新设备上
:param batch: 需要迁移的数据
:param device: 数据应当迁移到的设备当该参数的值为 ``None`` 时则不执行任何操作
:param non_blocking: ``pytorch`` 数据迁移方法 ``to`` 的参数
:return: 迁移到新设备上的数据集合
"""
if device is None:
return batch

View File

@ -38,10 +38,16 @@ __all__ = [
def get_fn_arg_names(fn: Callable) -> List[str]:
r"""
返回一个函数所有参数的名字
该函数可以返回一个函数所有参数的名字::
:param fn: 需要查询的函数
:return: 一个列表其中的元素是函数 ``fn`` 参数的字符串名字
>>> def function(a, b=1):
... return a
...
>>> get_fn_arg_names(function)
['a', 'b']
:param fn: 需要查询的函数
:return: 包含函数 ``fn`` 参数名的列表
"""
return list(inspect.signature(fn).parameters)
@ -49,7 +55,7 @@ def get_fn_arg_names(fn: Callable) -> List[str]:
def auto_param_call(fn: Callable, *args, signature_fn: Optional[Callable] = None,
mapping: Optional[Dict[AnyStr, AnyStr]] = None) -> Any:
r"""
该函数会根据输入函数的形参名从 ``*args`` 因此都需要是 ``dict`` 类型中找到匹配的值进行调用如果传入的数据与 ``fn`` 的形参不匹配可以通过
该函数会根据输入函数的形参名从 ``*args`` 均为 ``dict`` 类型中找到匹配的值进行调用如果传入的数据与 ``fn`` 的形参不匹配可以通过
``mapping`` 参数进行转换``mapping`` 参数中的一对 ``(key, value)`` 表示在 ``*args`` 中找到 ``key`` 对应的值并将这个值传递给形参中名为
``value`` 的参数
@ -161,13 +167,13 @@ def _get_keys(args:List[Dict]) -> List[List[str]]:
def _get_fun_msg(fn, with_fp=True)->str:
"""
获取函数的基本信息帮助报错
ex:
print(_get_fun_msg(_get_fun_msg))
# `_get_fun_msg(fn) -> str`(In file:/Users/hnyan/Desktop/projects/fastNLP/fastNLP/fastNLP/core/utils/utils.py)
获取函数的基本信息帮助报错::
>>>> print(_get_fun_msg(_get_fun_msg))
`_get_fun_msg(fn) -> str`(In file:/Users/hnyan/Desktop/projects/fastNLP/fastNLP/fastNLP/core/utils/utils.py)
:param callable fn:
:param with_fp: 是否包含函数所在的文件信息
:param with_fp: 是否包含函数所在的文件信息
:return:
"""
if isinstance(fn, functools.partial):
@ -224,7 +230,7 @@ def _check_valid_parameters_number(fn, expected_params:List[str], fn_name=None):
def check_user_specific_params(user_params: Dict, fn: Callable):
"""
该函数使用用户的输入来对指定函数的参数进行赋值主要用于一些用户无法直接调用函数的情况
该函数主要作用在于帮助检查用户对使用函数 ``fn`` 的参数输入是否有误
主要作用在于帮助检查用户对使用函数 ``fn`` 的参数输入是否有误
:param user_params: 用户指定的参数的值应当是一个字典其中 ``key`` 表示每一个参数的名字
``value`` 为每一个参数的值
@ -241,7 +247,7 @@ def check_user_specific_params(user_params: Dict, fn: Callable):
def dataclass_to_dict(data: "dataclasses.dataclass") -> Dict:
"""
将传入的 `dataclass` 实例转换为字典
将传入的 ``dataclass`` 实例转换为字典
"""
if not is_dataclass(data):
raise TypeError(f"Parameter `data` can only be `dataclass` type instead of {type(data)}.")
@ -253,11 +259,11 @@ def dataclass_to_dict(data: "dataclasses.dataclass") -> Dict:
def match_and_substitute_params(mapping: Optional[Union[Callable, Dict]] = None, data: Optional[Any] = None) -> Any:
r"""
用来实现将输入的 ``batch``或者输出的 ``outputs``通过 ``mapping`` 将键值进行更换的功能
用来实现将输入的 ``batch`` 或者输出的 ``outputs`` 通过 ``mapping`` 将键值进行更换的功能
该函数应用于 ``input_mapping`` ``output_mapping``
对于 ``input_mapping``该函数会在 :class:`~fastNLP.core.controllers.TrainBatchLoop` 中取完数据后立刻被调用
对于 ``output_mapping``该函数会在 :class:`~fastNLP.core.Trainer` :meth:`~fastNLP.core.Trainer.train_step`
* 对于 ``input_mapping``该函数会在 :class:`~fastNLP.core.controllers.TrainBatchLoop` 中取完数据后立刻被调用
* 对于 ``output_mapping``该函数会在 :class:`~fastNLP.core.Trainer` :meth:`~fastNLP.core.Trainer.train_step`
以及 :class:`~fastNLP.core.Evaluator` :meth:`~fastNLP.core.Evaluator.train_step` 中得到结果后立刻被调用
转换的逻辑按优先级依次为
@ -277,9 +283,9 @@ def match_and_substitute_params(mapping: Optional[Union[Callable, Dict]] = None,
然后使用 ``mapping`` 对这个 ``Dict`` 进行转换如果没有匹配上 ``mapping`` 中的 ``key`` 则保持 ``\'\_number\'`` 这个形式。
:param mapping: 用于转换的字典或者函数``mapping`` 是函数时返回值必须为字典类型
:param mapping: 用于转换的字典或者函数 ``mapping`` 是函数时返回值必须为字典类型
:param data: 需要被转换的对象
:return: 返回转换的结果
:return: 返回转换的结果
"""
if mapping is None:
return data
@ -331,19 +337,19 @@ def apply_to_collection(
**kwargs: Any,
) -> Any:
"""
使用函数 ``function`` 递归地在 ``data`` 中的元素执行但是仅在满足元素为 ``dtype`` 时执行
递归地对 ``data`` 中的元素执行函数 ``function``仅在满足元素为 ``dtype`` 时执行
该函数参考了 `pytorch-lightning <https://github.com/PyTorchLightning/pytorch-lightning>`_ 的实现
:param data: 需要进行处理的数据集合或数据
:param dtype: 数据的类型函数 ``function`` 只会被应用于 ``data`` 中类型为 ``dtype`` 的数据
:param function: 对数据进行处理的函数
:param args: ``function`` 所需要的其它参数
:param data: 需要进行处理的数据集合或数据
:param dtype: 数据的类型函数 ``function`` 只会被应用于 ``data`` 中类型为 ``dtype`` 的数据
:param function: 对数据进行处理的函数
:param args: ``function`` 所需要的其它参数
:param wrong_dtype: ``function`` 一定不会生效的数据类型如果数据既是 ``wrong_dtype`` 类型又是 ``dtype`` 类型
那么也不会生效
:param include_none: 是否包含执行结果为 ``None`` 的数据默认为 ``True``
:param kwargs: ``function`` 所需要的其它参数
:return: 经过 ``function`` 处理后的数据集合
那么也不会生效
:param include_none: 是否包含执行结果为 ``None`` 的数据默认为 ``True``
:param kwargs: ``function`` 所需要的其它参数
:return: 经过 ``function`` 处理后的数据集合
"""
# Breaking condition
if isinstance(data, dtype) and (wrong_dtype is None or not isinstance(data, wrong_dtype)):
@ -411,20 +417,20 @@ def apply_to_collection(
@contextmanager
def nullcontext():
r"""
实现一个什么都不做的上下文环境
实现一个什么都不做的上下文环境
"""
yield
def sub_column(string: str, c: int, c_size: int, title: str) -> str:
r"""
对传入的字符串进行截断方便在命令行中显示
对传入的字符串进行截断方便在命令行中显示
:param string: 要被截断的字符串
:param c: 命令行列数
:param c_size: :class:`~fastNLP.core.Instance` :class:`fastNLP.core.DataSet` ``field`` 数目
:param title: 列名
:return: 对一个过长的列进行截断的结果
:param string: 要被截断的字符串
:param c: 命令行列数
:param c_size: :class:`~fastNLP.core.Instance` :class:`fastNLP.core.DataSet` ``field`` 数目
:param title: 列名
:return: 对一个过长的列进行截断的结果
"""
avg = max(int(c / c_size / 2), len(title))
string = str(string)
@ -453,7 +459,7 @@ def _is_iterable(value):
def pretty_table_printer(dataset_or_ins) -> PrettyTable:
r"""
``fastNLP`` 中展示数据的函数::
用于``fastNLP`` 中展示数据的函数::
>>> ins = Instance(field_1=[1, 1, 1], field_2=[2, 2, 2], field_3=["a", "b", "c"])
+-----------+-----------+-----------------+
@ -462,8 +468,8 @@ def pretty_table_printer(dataset_or_ins) -> PrettyTable:
| [1, 1, 1] | [2, 2, 2] | ['a', 'b', 'c'] |
+-----------+-----------+-----------------+
:param dataset_or_ins: 要展示的 :class:`~fastNLP.core.DataSet` 或者 :class:`~fastNLP.core.Instance`
:return: 根据 ``terminal`` 大小进行自动截断的数据表格
:param dataset_or_ins: 要展示的 :class:`~fastNLP.core.DataSet` 或者 :class:`~fastNLP.core.Instance` 实例
:return: 根据命令行大小进行自动截断的数据表格
"""
x = PrettyTable()
try:
@ -529,7 +535,7 @@ def deprecated(help_message: Optional[str] = None):
"""
标记当前功能已经过时的装饰器
:param help_message: 一段指引信息告知用户如何将代码切换为当前版本提倡的用法
:param help_message: 一段指引信息告知用户如何将代码切换为当前版本提倡的用法
"""
def decorator(deprecated_function: Callable):
@ -578,10 +584,10 @@ def seq_len_to_mask(seq_len, max_len: Optional[int]):
>>>print(mask.size())
torch.Size([14, 100])
:param seq_len: 大小为 ``(B,)`` 的长度序列
:param int max_len: 将长度 ``pad`` ``max_len``默认情况 ``None``使用的是 ``seq_len`` 中最长的长度
:param seq_len: 大小为 ``(B,)`` 的长度序列
:param int max_len: 将长度补齐或截断``max_len``默认情况 ``None``使用的是 ``seq_len`` 中最长的长度
但在 :class:`torch.nn.DataParallel` 等分布式的场景下可能不同卡的 ``seq_len`` 会有区别所以需要传入
一个 ``max_len`` 使得 ``mask`` 长度 ``pad`` 到该长度
一个 ``max_len`` 使得 ``mask`` 补齐或截断到该长度
:return: 大小为 ``(B, max_len)`` ``mask`` 元素类型为 ``bool`` ``uint8``
"""
if isinstance(seq_len, np.ndarray):

View File

@ -67,7 +67,6 @@ def test_trainer_fleet(
validate_dataloaders = val_dataloader
validate_every = MNISTTrainFleetConfig.validate_every
metrics = {"acc": Accuracy()}
data_device = f'gpu:{os.environ["USER_CUDA_VISIBLE_DEVICES"].split(",").index(os.environ["CUDA_VISIBLE_DEVICES"])}'
trainer = Trainer(
model=model,
driver=driver,
@ -83,7 +82,6 @@ def test_trainer_fleet(
n_epochs=n_epochs,
callbacks=callbacks,
# output_from_new_proc="logs",
data_device=data_device
)
trainer.run()

View File

@ -2,7 +2,7 @@ import os
import pytest
from fastNLP.core.utils.paddle_utils import get_device_from_visible, paddle_to, paddle_move_data_to_device
from fastNLP.core.utils.paddle_utils import _convert_data_device, paddle_to, paddle_move_data_to_device
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
if _NEED_IMPORT_PADDLE:
import paddle
@ -11,24 +11,24 @@ if _NEED_IMPORT_PADDLE:
("user_visible_devices, cuda_visible_devices, device, correct"),
(
(None, None, 1, "gpu:1"),
(None, "2,4,5,6", 5, "gpu:2"),
(None, "3,4,5", 4, "gpu:1"),
("0,1,2,3,4,5,6,7", "0", "cpu", "cpu"),
(None, "2,4,5,6", 2, "gpu:2"),
(None, "3,4,5", 1, "gpu:1"),
("0,1,2,3,4,5,6,7", "0", "cpu", "cpu"),
("3,4,5,6,7", "0", "cpu", "cpu"),
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:4", "gpu:1"),
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:5", "gpu:2"),
("3,4,5,6", "3,5", 0, "gpu:0"),
("3,6,7,8", "6,7,8", "gpu:2", "gpu:1"),
)
)
def test_get_device_from_visible(user_visible_devices, cuda_visible_devices, device, correct):
def test_convert_data_device(user_visible_devices, cuda_visible_devices, device, correct):
_cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
_user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices
if user_visible_devices is not None:
os.environ["USER_CUDA_VISIBLE_DEVICES"] = user_visible_devices
res = get_device_from_visible(device)
res = _convert_data_device(device)
assert res == correct
# 还原环境变量