added func code

This commit is contained in:
wiyr 2023-06-12 10:39:51 +08:00 committed by dilililiwhy
parent 37ba281c40
commit 6d07c029fe
11 changed files with 398 additions and 120 deletions

View File

@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import copy
from functools import wraps
import torch
import copy
import torch_npu
from functools import wraps
from torch_npu.contrib import transfer_to_npu
from torch import distributed as dist
from deepspeed.accelerator import get_accelerator
from .package_info import (
__description__,
@ -53,17 +55,20 @@ def wrapper_type(fn):
return decorated
torch.Tensor.type = wrapper_type(torch.Tensor.type) # fix for torch.Tensor.type()
torch.Tensor.type = wrapper_type(torch.Tensor.type)
def wrapper_dist_long2int(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not args:
args = (kwargs.pop('tensor'),)
if args[0].dtype == torch.long and not kwargs.get('async_op', False):
new_args = list(copy.deepcopy(args))
new_args[0] = new_args[0].int()
fn(*new_args, **kwargs)
args[0].copy_(new_args[0].long())
return None
return
return fn(*args, **kwargs)
return wrapper

View File

@ -20,6 +20,7 @@ import os
import torch
import deepspeed
from ascendspeed.enums import PositionEmbeddingType
def parse_args(extra_args_provider=None, defaults={},
@ -229,6 +230,15 @@ def parse_args(extra_args_provider=None, defaults={},
assert args.encoder_seq_length is not None
args.seq_length = args.encoder_seq_length
if args.position_embedding_type == PositionEmbeddingType.absolute or args.position_embedding_type == PositionEmbeddingType.alibi:
assert args.max_position_embeddings is not None
if args.seq_length is not None:
assert args.max_position_embeddings >= args.seq_length
if args.decoder_seq_length is not None:
assert args.max_position_embeddings >= args.decoder_seq_length
else:
assert args.max_position_embeddings is None
if args.seq_length is not None:
assert args.max_position_embeddings >= args.seq_length
if args.decoder_seq_length is not None:
@ -307,12 +317,21 @@ def _add_network_size_args(parser):
'attention. This is set to '
' args.hidden_size // args.num_attention_heads '
'if not provided.')
group.add_argument('--embed-layernorm', action='store_true',
help='use layernorm for embedding')
group.add_argument('--max-position-embeddings', type=int, default=None,
help='Maximum number of position embeddings to use. '
'This is the size of position embedding.')
group.add_argument('--position-embedding-type', type=lambda x: PositionEmbeddingType[x],
choices=list(PositionEmbeddingType), default=PositionEmbeddingType.absolute,
help='Define position embedding type ("absolute" | "rotary" | "alibi"). "absolute" by default.')
group.add_argument('--make-vocab-size-divisible-by', type=int, default=128,
help='Pad the vocab size to be divisible by this value.'
'This is added for computational efficieny reasons.')
group.add_argument('--pad-vocab-size-to', type=int, default=None,
help='Pad the vocab size to this value.'
'This value must be greater than the initial size of the tokenizer,'
'needs to be divisible by TP size and `make-vocab-size-divisible-by`.')
group.add_argument('--layernorm-epsilon', type=float, default=1e-5,
help='Layer norm epsilon.')
group.add_argument('--apply-residual-connection-post-layernorm',
@ -340,6 +359,24 @@ def _add_logging_args(parser):
help='If set, calculate and log parameters norm.')
group.add_argument('--log-num-zeros-in-grad', action='store_true',
help='If set, calculate and log the number of zeros in gradient.')
group.add_argument('--timing-log-level', type=int,
default=0, choices=range(0,3),
help='Granularity level to measure and report timing. '
' 0: report only iteration time and make sure timing '
' does not introduce extra overhead.'
' 1: report timing for operations that are executed '
' very limited times (basically once) during '
' each iteration (such as gradient all-reduce) '
' 2: report timing for operations that migh be '
' executed numerous times during each iteration. '
'Note that setting the level to 1 or 2 might '
'cause increase in iteration time.')
group.add_argument('--timing-log-option', type=str, default='minmax',
choices=['max', 'minmax', 'all'],
help='Options for logging timing:'
' max: report the max timing across all ranks'
' minmax: report min and max timings across all ranks'
' all: report timings of all ranks.')
group.add_argument('--tensorboard-log-interval', type=int, default=1,
help='Report to tensorboard interval.')
group.add_argument('--tensorboard-queue-size', type=int, default=1000,
@ -740,8 +777,12 @@ def _add_data_args(parser):
default=None,
choices=['BertWordPieceLowerCase',
'BertWordPieceCase',
'GPT2BPETokenizer'],
'GPT2BPETokenizer',
'PretrainedFromHF'],
help='What type of tokenizer to use.')
group.add_argument("--tokenizer-name-or-path", type=str, default=None,
help="Name or path of the huggingface tokenizer.")
group.add_argument('--data-impl', type=str, default='infer',
choices=['lazy', 'cached', 'mmap', 'infer'],
help='Implementation of indexed datasets.')
@ -752,6 +793,8 @@ def _add_data_args(parser):
'end-of-document token.')
group.add_argument('--eod-mask-loss', action='store_true',
help='Mask loss for the end of document tokens.')
group.add_argument('--loss-on-targets-only', action='store_true',
help='Mask loss on input sequence.')
group.add_argument('--train-data-exact-num-epochs', type=int, default=None,
help='When building the train dataset, force it to be '
'an exact number of epochs of the raw data')

View File

@ -21,6 +21,7 @@ import sys
import numpy as np
from deepspeed.accelerator import get_accelerator
import torch
from ascendspeed.enums import PositionEmbeddingType
from ascendspeed import (get_args,
@ -63,7 +64,12 @@ def check_checkpoint_args(checkpoint_args):
_compare('num_layers')
_compare('hidden_size')
_compare('num_attention_heads')
_compare('max_position_embeddings')
_compare('position_embedding_type')
# with alibi we can change `max_position_embeddings`
if args.position_embedding_type != PositionEmbeddingType.alibi:
_compare('max_position_embeddings')
if args.vocab_file:
_compare('make_vocab_size_divisible_by')
_compare('padded_vocab_size')

View File

@ -220,20 +220,11 @@ def _build_index_mappings(name, data_prefix, documents, sizes,
sample_idx_filename = _filename + '_sample_idx.npy'
shuffle_idx_filename = _filename + '_shuffle_idx.npy'
if name == 'train':
# force to use certain index files
if args.train_doc_idx_path is not None:
doc_idx_filename = args.train_doc_idx_path
if args.train_sample_idx_path is not None:
sample_idx_filename = args.train_sample_idx_path
if args.train_shuffle_idx_path is not None:
shuffle_idx_filename = args.train_shuffle_idx_path
# Build the indexed mapping if not exist.
if is_rank_0():
if (not os.path.isfile(doc_idx_filename)) or \
(not os.path.isfile(sample_idx_filename)) or \
(not os.path.isfile(shuffle_idx_filename)):
(not os.path.isfile(sample_idx_filename)) or \
(not os.path.isfile(shuffle_idx_filename)):
print_rank_0(' > WARNING: could not find index map files, building '
'the indices on rank 0 ...')

View File

@ -11,6 +11,7 @@
# An empty sentence no longer separates documents.
# Some of the fixes/improvements are adopted from
# https://github.com/bigscience-workshop/AscendSpeed/blob/main/megatron/data/indexed_dataset.py
from functools import lru_cache
import os

View File

@ -17,7 +17,6 @@
import os
import sys
import time
import torch
@ -25,6 +24,8 @@ from ascendspeed.tokenizer import build_tokenizer
from .arguments import parse_args
from .microbatches import build_num_microbatches_calculator
from deepspeed.accelerator import get_accelerator
from .timers import Timers
_GLOBAL_ARGS = None
_GLOBAL_NUM_MICROBATCHES_CALCULATOR = None
_GLOBAL_TOKENIZER = None
@ -83,11 +84,11 @@ def set_global_variables(extra_args_provider=None, args_defaults={},
defaults=args_defaults,
ignore_unknown_args=ignore_unknown_args)
_build_num_microbatches_calculator(args)
if args.vocab_file:
if args.vocab_file or args.tokenizer_name_or_path:
_ = _build_tokenizer(args)
_set_tensorboard_writer(args)
_set_adlr_autoresume(args)
_set_timers()
_set_timers(args)
def _parse_args(extra_args_provider=None, defaults={},
@ -163,11 +164,11 @@ def _set_adlr_autoresume(args):
_GLOBAL_ADLR_AUTORESUME = AutoResume
def _set_timers():
def _set_timers(args):
"""Initialize timers."""
global _GLOBAL_TIMERS
_ensure_var_is_not_initialized(_GLOBAL_TIMERS, 'timers')
_GLOBAL_TIMERS = Timers()
_GLOBAL_TIMERS = Timers(args.timing_log_level, args.timing_log_option)
def _ensure_var_is_initialized(var, name):
@ -177,86 +178,4 @@ def _ensure_var_is_initialized(var, name):
def _ensure_var_is_not_initialized(var, name):
"""Make sure the input variable is not None."""
assert var is None, '{} is already initialized.'.format(name)
class _Timer:
"""Timer."""
def __init__(self, name):
self.name_ = name
self.elapsed_ = 0.0
self.started_ = False
self.start_time = time.time()
def start(self):
"""Start the timer."""
assert not self.started_, 'timer has already been started'
get_accelerator().synchronize()
self.start_time = time.time()
self.started_ = True
def stop(self):
"""Stop the timer."""
assert self.started_, 'timer is not started'
get_accelerator().synchronize()
self.elapsed_ += (time.time() - self.start_time)
self.started_ = False
def reset(self):
"""Reset timer."""
self.elapsed_ = 0.0
self.started_ = False
def elapsed(self, reset=True):
"""Calculate the elapsed time."""
started_ = self.started_
# If the timing in progress, end it first.
if self.started_:
self.stop()
# Get the elapsed time.
elapsed_ = self.elapsed_
# Reset the elapsed time
if reset:
self.reset()
# If timing was in progress, set it back.
if started_:
self.start()
return elapsed_
class Timers:
"""Group of timers."""
def __init__(self):
self.timers = {}
def __call__(self, name):
if name not in self.timers:
self.timers[name] = _Timer(name)
return self.timers[name]
def write(self, names, writer, iteration, normalizer=1.0, reset=False):
"""Write timers to a tensorboard writer"""
# currently when using add_scalars,
# torch.utils.add_scalars makes each timer its own run, which
# polutes the runs list, so we just add each as a scalar
assert normalizer > 0.0
for name in names:
value = self.timers[name].elapsed(reset=reset) / normalizer
writer.add_scalar(name + '-time', value, iteration)
def log(self, names, normalizer=1.0, reset=True):
"""Log a group of timers."""
assert normalizer > 0.0
string = 'time (ms)'
for name in names:
elapsed_time = self.timers[name].elapsed(
reset=reset) * 1000.0 / normalizer
string += ' | {}: {:.2f}'.format(name, elapsed_time)
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == (
torch.distributed.get_world_size() - 1):
print(string, flush=True)
else:
print(string, flush=True)
assert var is None, '{} is already initialized.'.format(name)

View File

@ -22,3 +22,4 @@ from .gpt_model import GPTModel, GPTModelPipe
from .llama_model import LlamaModel, LlamaModelPipe
from .language_model import get_language_model
from .module import Float16Module
from .enums import ModelType

View File

@ -15,6 +15,10 @@
import enum
class ModelType(enum.Enum):
encoder_or_decoder = 1
encoder_and_decoder = 2
class LayerType(enum.Enum):
encoder = 1
decoder = 2
@ -25,4 +29,6 @@ class AttnType(enum.Enum):
class AttnMaskType(enum.Enum):
padding = 1
causal = 2
causal = 2 # Overrides `attention_mask` to be a lower triangular matrix
prefix = 3
custom = 4 # Forces one to pass an `attention_mask` that's 1 if we need to mask. Tensor that can be broadcast to [micro_batch_size, n_head, seq_length, seq_length]

View File

@ -18,12 +18,19 @@
with some changes. """
import numbers
from ascendspeed.mpu.utils import make_viewless_tensor
import torch
from torch.nn.parameter import Parameter
from torch.nn import init
import importlib
from torch.nn import functional as F
try:
from apex.contrib.layer_norm.layer_norm import FastLayerNormFN
HAVE_PERSIST_LAYER_NORM = True
except:
HAVE_PERSIST_LAYER_NORM = False
global fused_mix_prec_layer_norm_cuda
fused_mix_prec_layer_norm_cuda = None
@ -62,21 +69,30 @@ class FusedLayerNormAffineFunction(torch.autograd.Function):
class MixedFusedLayerNorm(torch.nn.Module):
def __init__(self, normalized_shape, eps=1e-5):
super(MixedFusedLayerNorm, self).__init__()
if isinstance(normalized_shape, numbers.Integral):
normalized_shape = (normalized_shape,)
self.normalized_shape = torch.Size(normalized_shape)
self.eps = eps
self.weight = Parameter(torch.Tensor(*normalized_shape))
self.bias = Parameter(torch.Tensor(*normalized_shape))
self.reset_parameters()
def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True, sequence_parallel=False):
super(MixedFusedLayerNorm, self).__init__()
if isinstance(normalized_shape, numbers.Integral):
normalized_shape = (normalized_shape,)
self.normalized_shape = torch.Size(normalized_shape)
self.eps = eps
self.weight = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape))
self.bias = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape))
self.reset_parameters()
self.no_persist_layer_norm = True
self.sequence_parallel = sequence_parallel
# set sequence parallelism flag on weight and bias parameters
setattr(self.weight, 'sequence_parallel', self.sequence_parallel)
setattr(self.bias, 'sequence_parallel', self.sequence_parallel)
def reset_parameters(self):
init.ones_(self.weight)
init.zeros_(self.bias)
def forward(self, input):
return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps)
if self.no_persist_layer_norm:
return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps)
else:
output = FastLayerNormFN.apply(input, self.weight, self.bias, self.eps)
output = make_viewless_tensor(inp=output, requires_grad=input.requires_grad, keep_graph=True)
return output

283
ascendspeed/timers.py Normal file
View File

@ -0,0 +1,283 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
"""Megatron timers."""
from abc import ABC
from abc import abstractmethod
import time
import torch
class TimerBase(ABC):
def __init__(self, name):
self.name = name
@abstractmethod
def start(self, barrier=False):
pass
@abstractmethod
def stop(self, barrier=False):
pass
@abstractmethod
def reset(self):
pass
@abstractmethod
def elapsed(self, reset=True, barrier=False):
pass
class DummyTimer(TimerBase):
def __init__(self):
super().__init__('dummy timer')
def start(self, barrier=False):
return
def stop(self, barrier=False):
return
def reset(self):
return
def elapsed(self, reset=True, barrier=False):
raise Exception('dummy timer should not be used to '
'calculate elapsed time')
class Timer(TimerBase):
"""
Comment on using `barrier`: If this flag is passed, then all
the caller processes will wait till all reach the timing routine.
It is up to the user to make sure all the ranks in `barrier_group`
call it otherwise, it will result in a hang.
Comment on `barrier_group`: By default it is set to None which
in torch distributed land, it will result in the global communicator.
"""
def __init__(self, name):
super().__init__(name)
self._elapsed = 0.0
self._started = False
# Note that None will default to the global process group
self._barrier_group = None
self._start_time = time.time()
def set_barrier_group(self, barrier_group):
self._barrier_group = barrier_group
def start(self, barrier=False):
"""Start the timer."""
assert not self._started, 'timer has already been started'
if barrier:
torch.distributed.barrier(group=self._barrier_group)
torch.cuda.synchronize()
self._start_time = time.time()
self._started = True
def stop(self, barrier=False):
"""Stop the timer."""
assert self._started, 'timer is not started'
if barrier:
torch.distributed.barrier(group=self._barrier_group)
torch.cuda.synchronize()
self._elapsed += (time.time() - self._start_time)
self._started = False
def reset(self):
"""Reset timer."""
self._elapsed = 0.0
self._started = False
def elapsed(self, reset=True, barrier=False):
"""Calculate the elapsed time."""
_started = self._started
# If the timing in progress, end it first.
if self._started:
self.stop(barrier=barrier)
# Get the elapsed time.
_elapsed = self._elapsed
# Reset the elapsed time
if reset:
self.reset()
# If timing was in progress, set it back.
if _started:
self.start(barrier=barrier)
return _elapsed
class Timers:
"""Group of timers."""
def __init__(self, log_level, log_option):
self._log_level = log_level
self._log_option = log_option
self._timers = {}
self._log_levels = {}
self._dummy_timer = DummyTimer()
self._max_log_level = 2
def __call__(self, name, log_level=None):
if name not in self._timers:
self._timers[name] = Timer(name=name)
return self._timers[name]
def _get_elapsed_time_all_ranks(self, names, reset, barrier):
"""
Assumptions:
- All the ranks call this function.
- `names` are identical on all ranks.
If the above assumptions are not met, calling this function will
result in hang.
Arguments:
- names: list of timer names
- reset: reset the timer after recording the elapsed time
- barrier: if set, do a global barrier before time measurments
"""
# First make sure all the callers are in sync.
if barrier:
torch.distributed.barrier()
world_size = torch.distributed.get_world_size()
rank = torch.distributed.get_rank()
# Here we can use gather on the rank we want to print the
# timing, however, there is no gather_base support in
# pytorch yet. It is simpler to deal with a single tensor
# and since we are only gathering a small amount of data,
# it should be ok to use all-gather instead of gather.
rank_name_to_time = torch.zeros((world_size, len(names)),
dtype=torch.float,
device=torch.cuda.current_device())
for i, name in enumerate(names):
if name in self._timers:
# Here we don't need to pass the barrier flag as all
# the processes are already in sync. This avoids the
# issue of different timers having different barrier
# groups inside their class.
rank_name_to_time[rank, i] = self._timers[name].elapsed(
reset=reset)
# See the note above for why we are not using gather.
torch.distributed._all_gather_base(rank_name_to_time.view(-1),
rank_name_to_time[rank, :].view(-1))
return rank_name_to_time
def _get_global_min_max_time(self, names, reset, barrier, normalizer):
"""Report only min and max times across all ranks."""
rank_name_to_time = self._get_elapsed_time_all_ranks(names, reset,
barrier)
name_to_min_max_time = {}
for i, name in enumerate(names):
rank_to_time = rank_name_to_time[:, i]
# filter out the ones we did not have any timings for
rank_to_time = rank_to_time[rank_to_time > 0.0]
# If the timer exists:
if rank_to_time.numel() > 0:
name_to_min_max_time[name] = (
rank_to_time.min().item() / normalizer,
rank_to_time.max().item() / normalizer)
return name_to_min_max_time
def _get_global_min_max_time_string(self, names, reset, barrier,
normalizer, max_only):
name_to_min_max_time = self._get_global_min_max_time(
names, reset, barrier, normalizer)
if not name_to_min_max_time:
return None
output_string = '(min, max) time across ranks (ms):'
for name in name_to_min_max_time:
min_time, max_time = name_to_min_max_time[name]
if max_only:
output_string += '\n {}: {:.2f}'.format(
(name+' ').ljust(48, '.'), max_time)
else:
output_string += '\n {}: ({:.2f}, {:.2f})'.format(
(name+' ').ljust(48, '.'), min_time, max_time)
return output_string
def _get_all_ranks_time_string(self, names, reset, barrier, normalizer):
"""Report times across all ranks."""
rank_name_to_time = self._get_elapsed_time_all_ranks(names, reset,
barrier)
output_string = 'times across ranks (ms):'
no_reported_timing = True
for i, name in enumerate(names):
not_yet_found = True
for rank in range(torch.distributed.get_world_size()):
if rank_name_to_time[rank, i] > 0:
no_reported_timing = False
if not_yet_found:
not_yet_found = False
output_string += '\n {}:'.format(name)
output_string += '\n rank {:2d}: {:.2f}'.format(
rank, rank_name_to_time[rank, i] / normalizer)
if no_reported_timing:
return None
return output_string
def log(self, names, rank=None, normalizer=1.0, reset=True, barrier=False):
"""Log a group of timers."""
# Print.
assert normalizer > 0.0
if self._log_option in ['max', 'minmax']:
max_only = False
if self._log_option == 'max':
max_only = True
output_string = self._get_global_min_max_time_string(
names, reset, barrier, normalizer/1000.0, max_only)
elif self._log_option == 'all':
output_string = self._get_all_ranks_time_string(names,
reset, barrier,
normalizer/1000.0)
else:
raise Exception('unknown timing log option {}'.format(
self._log_option))
# If no input rank is provided, log on last rank.
if rank is None:
rank = torch.distributed.get_world_size() - 1
if rank == torch.distributed.get_rank() and output_string is not None:
print(output_string, flush=True)
def write(self, names, writer, iteration, normalizer=1.0,
reset=False, barrier=False):
"""Write timers to a tensorboard writer
Note that we only report maximum time across ranks to tensorboard.
"""
# currently when using add_scalars,
# torch.utils.add_scalars makes each timer its own run, which
# polutes the runs list, so we just add each as a scalar
assert normalizer > 0.0
name_to_min_max_time = self._get_global_min_max_time(
names, reset, barrier, normalizer)
if writer is not None:
for name in name_to_min_max_time:
_, max_time = name_to_min_max_time[name]
writer.add_scalar(name + '-time', max_time, iteration)

View File

@ -185,6 +185,8 @@ def pretrain(train_valid_test_dataset_provider,
print_rank_0('training ...')
iteration = 0
save_checkpoint(iteration, model, optimizer, lr_scheduler)
if args.do_train and args.train_iters > 0:
iteration = train(forward_step_func,
model, optimizer, lr_scheduler,
@ -328,7 +330,9 @@ def get_model(model_provider_func):
# GPU allocation.
for model_module in model:
model_module.to(get_accelerator().current_device_name())
device_name = get_accelerator().current_device_name()
print_rank_0(f"model to {device_name}")
model_module.to(device_name)
# Fp16 conversion.
@ -490,6 +494,7 @@ def setup_model_and_optimizer(model_provider_func, teacher=False,
# Number of train/valid/test samples.
if args.train_samples:
train_samples = args.train_samples
update_train_iters(args)
else:
train_samples = args.train_iters * args.global_batch_size
# eval_iters and test_iters here are not actually used, only for
@ -523,6 +528,7 @@ def setup_model_and_optimizer(model_provider_func, teacher=False,
lr_scheduler=lr_scheduler,
mpu=mpu if args.no_pipeline_parallel else None
)
assert model.fp16_enabled() == args.fp16, "megatron fp16 config does not match deepspeed"
if isinstance(model, deepspeed.PipelineEngine):
# hack to get batch_fn from pretrain_gpt.py
model.set_batch_fn(model.module._megatron_batch_fn)
@ -729,7 +735,7 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
timers_to_log = []
def add_to_logging(name):
if name in timers.timers:
if name in timers._timers:
timers_to_log.append(name)
add_to_logging('forward-compute')
add_to_logging('forward-recv')
@ -1274,6 +1280,7 @@ def build_train_valid_test_data_iterators(
# Number of train/valid/test samples.
if args.train_samples:
train_samples = args.train_samples
update_train_iters(args)
else:
train_samples = args.train_iters * args.global_batch_size
eval_iters = (args.train_iters // args.eval_interval + 1) * \