create megatron core

This commit is contained in:
fengliangjun 2023-07-24 15:00:57 +08:00 committed by dilililiwhy
parent b559dc6385
commit 260e8eea8f
52 changed files with 509 additions and 468 deletions

View File

@ -30,6 +30,7 @@ from ascendspeed import (get_args,
print_rank_0,
update_num_microbatches,
utils)
from ascendspeed.core import parallel_state
_CHECKPOINT_VERSION = None
@ -96,15 +97,15 @@ def get_checkpoint_name(checkpoints_path, iteration,
else:
directory = 'iter_{:07d}'.format(iteration)
# Use both the tensor and pipeline MP rank.
if mpu.get_pipeline_model_parallel_world_size() == 1:
if parallel_state.get_pipeline_model_parallel_world_size() == 1:
return os.path.join(checkpoints_path, directory,
'mp_rank_{:02d}'.format(
mpu.get_tensor_model_parallel_rank()),
parallel_state.get_tensor_model_parallel_rank()),
'model_optim_rng.pt')
return os.path.join(checkpoints_path, directory,
'mp_rank_{:02d}_{:03d}'.format(
mpu.get_tensor_model_parallel_rank(),
mpu.get_pipeline_model_parallel_rank()),
parallel_state.get_tensor_model_parallel_rank(),
parallel_state.get_pipeline_model_parallel_rank()),
'model_optim_rng.pt')
@ -125,7 +126,7 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler):
print_rank_0('saving checkpoint at iteration {:7d} to {}'.format(
iteration, args.save))
if not torch.distributed.is_initialized() or mpu.get_data_parallel_rank() == 0 \
if not torch.distributed.is_initialized() or parallel_state.get_data_parallel_rank() == 0 \
or args.deepspeed:
# Arguments, iteration, and model.
@ -141,7 +142,7 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler):
state_dict['model'] = model[0].state_dict_for_save_checkpoint()
else:
for i in range(len(model)):
mpu.set_virtual_pipeline_model_parallel_rank(i)
parallel_state.set_virtual_pipeline_model_parallel_rank(i)
state_dict['model%d' % i] = model[i].state_dict_for_save_checkpoint()
# Optimizer stuff.
@ -387,7 +388,7 @@ def load_checkpoint(model, optimizer, lr_scheduler, load_arg='load', strict=True
model[0].load_state_dict(state_dict['model'], strict=strict)
else:
for i in range(len(model)):
mpu.set_virtual_pipeline_model_parallel_rank(i)
parallel_state.set_virtual_pipeline_model_parallel_rank(i)
model[i].load_state_dict(state_dict['model%d' % i], strict=strict)
# Fix up query/key/value matrix ordering if needed
@ -457,7 +458,7 @@ def load_biencoder_checkpoint(model, only_query_model=False,
iteration = int(f.read().strip())
checkpoint_name = get_checkpoint_name(load_path, iteration, False)
if mpu.get_data_parallel_rank() == 0:
if parallel_state.get_data_parallel_rank() == 0:
print('global rank {} is loading checkpoint {}'.format(
torch.distributed.get_rank(), checkpoint_name))
@ -473,7 +474,7 @@ def load_biencoder_checkpoint(model, only_query_model=False,
model[0].load_state_dict(ret_state_dict)
torch.distributed.barrier()
if mpu.get_data_parallel_rank() == 0:
if parallel_state.get_data_parallel_rank() == 0:
print(' successfully loaded {}'.format(checkpoint_name))
return model

View File

@ -0,0 +1,12 @@
import ascendspeed.core.parallel_state
import ascendspeed.core.tensor_parallel
import ascendspeed.core.utils
# Alias parallel_state as mpu, its legacy name
mpu = parallel_state
__all__ = [
"parallel_state",
"tensor_parallel",
"utils",
]

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,7 +20,6 @@ import torch
from .utils import ensure_divisibility, GlobalMemoryBuffer
# Intra-layer model parallel group that the current rank belongs to.
_TENSOR_MODEL_PARALLEL_GROUP = None
# Inter-layer model parallel group that the current rank belongs to.
@ -48,6 +47,7 @@ _PIPELINE_GLOBAL_RANKS = None
# Memory buffers to avoid dynamic memory allocation
_GLOBAL_MEMORY_BUFFER = None
def is_unitialized():
"""Useful for code segments that may be accessed with or without mpu initialization"""
return _DATA_PARALLEL_GROUP is None
@ -84,23 +84,16 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
tensor_model_parallel_size_))
print('> initializing pipeline model parallel with size {}'.format(
pipeline_model_parallel_size_))
# Get world size and rank. Ensure some consistencies.
# 确保torch已经做了分布式初始化
assert torch.distributed.is_initialized()
world_size = torch.distributed.get_world_size() # 得到全局进程的总数
world_size = torch.distributed.get_world_size()
tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
# 后者表示一个完整模型所占的gpu数我们要保证前者能被后者整除
ensure_divisibility(world_size,
tensor_model_parallel_size * pipeline_model_parallel_size)
# 根据TP_size和PP_size求出DP_size
data_parallel_size = world_size // (tensor_model_parallel_size *
pipeline_model_parallel_size)
# 设置 tp、pp、dp的组数
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
num_data_parallel_groups = world_size // data_parallel_size
@ -111,10 +104,9 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_
# 获取当前进程的全局rank
rank = torch.distributed.get_rank()
# Build the data-parallel groups. 设置DP组
# Build the data-parallel groups.
global _DATA_PARALLEL_GROUP
assert _DATA_PARALLEL_GROUP is None, \
'data parallel group is already initialized'
@ -130,7 +122,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
if rank in ranks:
_DATA_PARALLEL_GROUP = group
# Build the model-parallel groups. 设置MP组
# Build the model-parallel groups.
global _MODEL_PARALLEL_GROUP
assert _MODEL_PARALLEL_GROUP is None, \
'model parallel group is already initialized'
@ -141,7 +133,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
if rank in ranks:
_MODEL_PARALLEL_GROUP = group
# Build the tensor model-parallel groups. 设置TP组
# Build the tensor model-parallel groups.
global _TENSOR_MODEL_PARALLEL_GROUP
assert _TENSOR_MODEL_PARALLEL_GROUP is None, \
'tensor model parallel group is already initialized'
@ -154,7 +146,6 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
# 设置PP组与embedding组
global _PIPELINE_MODEL_PARALLEL_GROUP
global _PIPELINE_GLOBAL_RANKS
assert _PIPELINE_MODEL_PARALLEL_GROUP is None, \
@ -165,7 +156,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size,
num_pipeline_model_parallel_groups)
group = torch.distributed.new_group(ranks) # 设置PP组
group = torch.distributed.new_group(ranks)
if rank in ranks:
_PIPELINE_MODEL_PARALLEL_GROUP = group
_PIPELINE_GLOBAL_RANKS = ranks
@ -175,10 +166,10 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
embedding_ranks = [ranks[0], ranks[-1]]
else:
embedding_ranks = ranks
group = torch.distributed.new_group(embedding_ranks) # 设置embedding组
group = torch.distributed.new_group(embedding_ranks)
if rank in embedding_ranks:
_EMBEDDING_GROUP = group
# Initialize global memory buffer
# This isn't really "parallel state" but there isn't another good place to
# put this. If we end up with a more generic initialization of megatron-core
@ -189,8 +180,8 @@ def initialize_model_parallel(tensor_model_parallel_size_=1,
def model_parallel_is_initialized():
"""Check if model and data parallel groups are initialized."""
if _TENSOR_MODEL_PARALLEL_GROUP is None or \
_PIPELINE_MODEL_PARALLEL_GROUP is None or \
_DATA_PARALLEL_GROUP is None:
_PIPELINE_MODEL_PARALLEL_GROUP is None or \
_DATA_PARALLEL_GROUP is None:
return False
return True
@ -300,7 +291,7 @@ def is_pipeline_first_stage(ignore_virtual=False):
"""Return True if in the first pipeline model-parallel stage, False otherwise."""
if not ignore_virtual:
if get_virtual_pipeline_model_parallel_world_size() is not None and \
get_virtual_pipeline_model_parallel_rank() != 0:
get_virtual_pipeline_model_parallel_rank() != 0:
return False
return get_pipeline_model_parallel_rank() == 0
@ -311,11 +302,11 @@ def is_pipeline_last_stage(ignore_virtual=False):
virtual_pipeline_model_parallel_world_size = \
get_virtual_pipeline_model_parallel_world_size()
if virtual_pipeline_model_parallel_world_size is not None and \
get_virtual_pipeline_model_parallel_rank() != (
get_virtual_pipeline_model_parallel_rank() != (
virtual_pipeline_model_parallel_world_size - 1):
return False
return get_pipeline_model_parallel_rank() == (
get_pipeline_model_parallel_world_size() - 1)
get_pipeline_model_parallel_world_size() - 1)
def get_virtual_pipeline_model_parallel_rank():
@ -356,6 +347,7 @@ def get_pipeline_model_parallel_last_rank():
last_rank_local = get_pipeline_model_parallel_world_size() - 1
return _PIPELINE_GLOBAL_RANKS[last_rank_local]
def get_pipeline_model_parallel_next_rank():
assert _PIPELINE_GLOBAL_RANKS is not None, \
"Pipeline parallel group is not initialized"

View File

@ -0,0 +1,2 @@
from .cross_entropy import vocab_parallel_cross_entropy
from .utils import VocabUtility

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,22 +16,26 @@
import torch
from .initialize import get_tensor_model_parallel_group
from .initialize import get_tensor_model_parallel_rank
from .initialize import get_tensor_model_parallel_world_size
from ascendspeed.core.parallel_state import (
get_tensor_model_parallel_group,
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size
)
from .utils import VocabUtility
class _VocabParallelCrossEntropy(torch.autograd.Function):
@staticmethod
def forward(ctx, vocab_parallel_logits, target):
def forward(ctx, vocab_parallel_logits, target, label_smoothing=0.0):
# Maximum value along vocab dimension across all GPUs.
logits_max = torch.max(vocab_parallel_logits, dim=-1)[0]
torch.distributed.all_reduce(logits_max,
op=torch.distributed.ReduceOp.MAX,
group=get_tensor_model_parallel_group())
# Subtract the maximum value.
vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1))
@ -77,6 +81,27 @@ class _VocabParallelCrossEntropy(torch.autograd.Function):
# Store softmax, target-mask and masked-target for backward pass.
exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))
vocab_size = exp_logits.size(-1)
if label_smoothing > 0:
"""
We'd like to assign 1 / (K - 1) probability mass to every index that is not the ground truth.
= (1 - alpha) * y_gt + alpha * mean(y_{i for i != gt})
= (1 - alpha) * y_gt + (alpha / (K - 1)) * \sum_{i != gt} y_i
= ((K - 1) * (1 - alpha) / (K - 1)) * y_gt + (alpha / (K - 1)) * \sum_{i != gt} y_i
= (K * (1 - alpha) - 1) / (K - 1)) * y_gt + (alpha / (K - 1)) * \sum_{i} y_i
= (1 - (alpha * K) / (K - 1)) * y_gt + ( (alpha * K) / (K - 1) ) * \sum_{i} y_i / K
From: https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/common/losses/smoothed_cross_entropy.py
"""
assert 1.0 > label_smoothing > 0.0
smoothing = label_smoothing * vocab_size / (vocab_size - 1)
# Exp logits at this point are normalized probabilities. So we can just take the log to get log-probs.
log_probs = torch.log(exp_logits)
mean_log_probs = log_probs.mean(dim=-1)
loss = (1.0 - smoothing) * loss - smoothing * mean_log_probs
ctx.label_smoothing, ctx.vocab_size = label_smoothing, vocab_size
ctx.save_for_backward(exp_logits, target_mask, masked_target_1d)
return loss
@ -86,6 +111,7 @@ class _VocabParallelCrossEntropy(torch.autograd.Function):
# Retreive tensors from the forward path.
softmax, target_mask, masked_target_1d = ctx.saved_tensors
label_smoothing, vocab_size = ctx.label_smoothing, ctx.vocab_size
# All the inputs have softmax as thier gradient.
grad_input = softmax
@ -96,15 +122,34 @@ class _VocabParallelCrossEntropy(torch.autograd.Function):
# Add the gradient from matching classes.
arange_1d = torch.arange(start=0, end=grad_2d.size()[0],
device=grad_2d.device)
grad_2d[arange_1d, masked_target_1d.long()] -= (
1.0 - target_mask.view(-1).float())
softmax_update = 1.0 - target_mask.view(-1).float()
if label_smoothing > 0:
smoothing = label_smoothing * vocab_size / (vocab_size - 1)
grad_2d[arange_1d, masked_target_1d] -= (1.0 - smoothing) * softmax_update
average_grad = 1 / vocab_size
grad_2d[arange_1d, :] -= smoothing * average_grad
else:
grad_2d[arange_1d, masked_target_1d] -= softmax_update
# Finally elementwise multiplication with the output gradients.
grad_input.mul_(grad_output.unsqueeze(dim=-1))
return grad_input, None
return grad_input, None, None
def vocab_parallel_cross_entropy(vocab_parallel_logits, target):
"""Helper function for the cross entropy."""
return _VocabParallelCrossEntropy.apply(vocab_parallel_logits, target)
def vocab_parallel_cross_entropy(vocab_parallel_logits, target, label_smoothing=0.0):
"""
Performs cross entropy loss when logits are split across tensor parallel ranks
Arguments:
vocab_parallel_logits: logits split across tensor parallel ranks
dimension is [sequence_length, batch_size, hidden_size]
target: correct vocab ids of dimseion [sequence_length, micro_batch_size]
lobal_smoothing: smoothing factor, must be in range [0.0, 1.0)
default is no smoothing (=0.0)
"""
return _VocabParallelCrossEntropy.apply(vocab_parallel_logits, target, label_smoothing)

View File

@ -0,0 +1,26 @@
# Copyright (c) 2022, HAUWEI CORPORATION. All rights reserved.
from typing import List, Sequence
from ascendspeed.core.utils import divide
class VocabUtility:
""" Split the vocabulary into `world_size` chunks and return the first
and last index of the vocabulary belonging to the `rank`
partition: Note that indices in [fist, last)
"""
@staticmethod
def vocab_range_from_per_partition_vocab_size(
per_partition_vocab_size: int, rank, world_size: int
) -> Sequence[int]:
index_f = rank * per_partition_vocab_size
index_l = index_f + per_partition_vocab_size
return index_f, index_l
@staticmethod
def vocab_range_from_global_vocab_size(global_vocab_size: int, rank: int, world_size: int) -> Sequence[int]:
per_partition_vocab_size = divide(global_vocab_size, world_size)
return VocabUtility.vocab_range_from_per_partition_vocab_size(
per_partition_vocab_size, rank, world_size
)

View File

@ -1,28 +1,15 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions used throughout AscendSpeed core"""
from functools import reduce
import operator
import torch
def ensure_divisibility(numerator, denominator):
"""Ensure that numerator is divisible by the denominator."""
assert numerator % denominator == 0, '{} is not divisible by {}'.format(
numerator, denominator)
assert numerator % denominator == 0, "{} is not divisible by {}".format(
numerator, denominator
)
def divide(numerator, denominator):
@ -31,11 +18,11 @@ def divide(numerator, denominator):
ensure_divisibility(numerator, denominator)
return numerator // denominator
class GlobalMemoryBuffer:
"""Global buffer to avoid dynamic memory allocations.
Caller should ensure that buffers of the same name
are not used concurrently.
"""
are not used concurrently."""
def __init__(self):
self.buffer = {}
@ -52,6 +39,7 @@ class GlobalMemoryBuffer:
return self.buffer[(name, dtype)][0:required_len].view(*tensor_shape)
def _kernel_make_viewless_tensor(inp, requires_grad):
'''Make a viewless tensor.
@ -71,6 +59,7 @@ def _kernel_make_viewless_tensor(inp, requires_grad):
out.set_(inp.data)
return out
class MakeViewlessTensor(torch.autograd.Function):
'''
Autograd function to make a viewless tensor.
@ -87,6 +76,7 @@ class MakeViewlessTensor(torch.autograd.Function):
def backward(ctx, grad_output):
return grad_output, None
def make_viewless_tensor(inp, requires_grad, keep_graph):
'''
Entry-point for creating viewless tensors.
@ -107,6 +97,7 @@ def make_viewless_tensor(inp, requires_grad, keep_graph):
else:
return _kernel_make_viewless_tensor(inp, requires_grad)
def split_tensor_along_last_dim(tensor, num_partitions,
contiguous_split_chunks=False):
"""Split a tensor along its last dimension.
@ -125,23 +116,4 @@ def split_tensor_along_last_dim(tensor, num_partitions,
if contiguous_split_chunks:
return tuple(chunk.contiguous() for chunk in tensor_list)
return tensor_list
class VocabUtility:
"""Split the vocabulary into `world_size` chunks amd return the
first and last index of the vocabulary belonging to the `rank`
partition: Note that indecies in [fist, last)"""
@staticmethod
def vocab_range_from_per_partition_vocab_size(per_partition_vocab_size,
rank, world_size):
index_f = rank * per_partition_vocab_size
index_l = index_f + per_partition_vocab_size
return index_f, index_l
@staticmethod
def vocab_range_from_global_vocab_size(global_vocab_size, rank, world_size):
per_partition_vocab_size = divide(global_vocab_size, world_size)
return VocabUtility.vocab_range_from_per_partition_vocab_size(
per_partition_vocab_size, rank, world_size)
return tensor_list

View File

@ -7,6 +7,7 @@ import torch
from ascendspeed import get_args, get_tokenizer, mpu, print_rank_0
from ascendspeed.data.dataset_utils import create_masked_lm_predictions, \
pad_and_convert_to_numpy
from ascendspeed.core import parallel_state
from ascendspeed.data.data_samplers import MegatronPretrainingSampler
from deepspeed.accelerator import get_accelerator
def make_attention_mask(source_block, target_block):
@ -36,8 +37,8 @@ def get_one_epoch_dataloader(dataset, micro_batch_size=None):
total_samples=len(dataset),
consumed_samples=0,
micro_batch_size=args.micro_batch_size,
data_parallel_rank=mpu.get_data_parallel_rank(),
data_parallel_size=mpu.get_data_parallel_world_size(),
data_parallel_rank=parallel_state.get_data_parallel_rank(),
data_parallel_size=parallel_state.get_data_parallel_world_size(),
drop_last=False)
return torch.utils.data.DataLoader(dataset,
@ -147,7 +148,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo
indexmap_filename += '.npy'
# Build the indexed mapping if not exist.
if mpu.get_data_parallel_rank() == 0 and \
if parallel_state.get_data_parallel_rank() == 0 and \
not os.path.isfile(indexmap_filename):
print(' > WARNING: could not find index map file {}, building '
'the indices on rank 0 ...'.format(indexmap_filename))
@ -188,9 +189,9 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo
# device_index=rank which is not the case for model
# parallel case
counts = get_accelerator().LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_data_parallel_group())
assert counts[0].item() == torch.distributed.get_world_size(
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
# Load indexed dataset.
print_rank_0(' > loading indexed mapping from {}'.format(

View File

@ -19,7 +19,7 @@
import torch
import random
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import parallel_state
def build_pretraining_data_loader(dataset, consumed_samples):
@ -36,22 +36,22 @@ def build_pretraining_data_loader(dataset, consumed_samples):
total_samples=len(dataset),
consumed_samples=consumed_samples,
micro_batch_size=args.micro_batch_size,
data_parallel_rank=mpu.get_data_parallel_rank(),
data_parallel_size=mpu.get_data_parallel_world_size())
data_parallel_rank=parallel_state.get_data_parallel_rank(),
data_parallel_size=parallel_state.get_data_parallel_world_size())
else:
batch_sampler = MegatronPretrainingSampler(
total_samples=len(dataset),
consumed_samples=consumed_samples,
micro_batch_size=args.micro_batch_size,
data_parallel_rank=mpu.get_data_parallel_rank(),
data_parallel_size=mpu.get_data_parallel_world_size())
data_parallel_rank=parallel_state.get_data_parallel_rank(),
data_parallel_size=parallel_state.get_data_parallel_world_size())
elif args.dataloader_type == 'cyclic':
batch_sampler = MegatronPretrainingRandomSampler(
total_samples=len(dataset),
consumed_samples=consumed_samples,
micro_batch_size=args.micro_batch_size,
data_parallel_rank=mpu.get_data_parallel_rank(),
data_parallel_size=mpu.get_data_parallel_world_size())
data_parallel_rank=parallel_state.get_data_parallel_rank(),
data_parallel_size=parallel_state.get_data_parallel_world_size())
else:
raise Exception('{} dataloader type is not supported.'.format(
args.dataloader_type))

View File

@ -28,9 +28,9 @@ import torch
from ascendspeed import (
get_args,
mpu,
print_rank_0
)
from ascendspeed.core import parallel_state
from ascendspeed.data.blendable_dataset import BlendableDataset
from ascendspeed.data.indexed_dataset import make_dataset as make_indexed_dataset
from deepspeed.accelerator import get_accelerator
@ -729,11 +729,11 @@ def get_samples_mapping(indexed_dataset,
# parallel case
if get_accelerator().device_count() > 0: # Skip when CPU-only
counts = get_accelerator().LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_pipeline_model_parallel_group())
assert counts[0].item() == (
torch.distributed.get_world_size() //
torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group()))
torch.distributed.get_world_size(group=parallel_state.get_tensor_model_parallel_group()))
# Load indexed dataset.
print_rank_0(' > loading indexed mapping from {}'.format(

View File

@ -4,7 +4,8 @@ import time
import numpy as np
import torch
from ascendspeed import print_rank_0, mpu, logging
from ascendspeed import print_rank_0, logging
from ascendspeed.core import parallel_state
from ascendspeed.data.blendable_dataset import BlendableDataset
from ascendspeed.data.dataset_utils import get_datasets_weights_and_num_samples, get_split_by_range_, \
get_train_valid_test_split_
@ -441,11 +442,11 @@ def _build_index_mappings(
# device_index=rank which is not the case for model
# parallel case
counts = torch.cuda.LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_pipeline_model_parallel_group())
assert counts[0].item() == (
torch.distributed.get_world_size() //
torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group()))
torch.distributed.get_world_size(group=parallel_state.get_tensor_model_parallel_group()))
# Load mappings.
start_time = time.time()

View File

@ -22,7 +22,8 @@ import numpy as np
import torch
from deepspeed.accelerator import get_accelerator
from ascendspeed import mpu, is_rank_0, print_rank_0, get_args
from ascendspeed import is_rank_0, print_rank_0, get_args
from ascendspeed.core import parallel_state
from ascendspeed.data.blendable_dataset import BlendableDataset
from ascendspeed.data.dataset_utils import get_datasets_weights_and_num_samples
from ascendspeed.data.dataset_utils import get_train_valid_test_split_, get_split_by_range_
@ -408,11 +409,11 @@ def _build_index_mappings(name, data_prefix, documents, sizes,
# device_index=rank which is not the case for model
# parallel case
counts = get_accelerator().LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_pipeline_model_parallel_group())
assert counts[0].item() == (
torch.distributed.get_world_size() //
torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group()))
torch.distributed.get_world_size(group=parallel_state.get_tensor_model_parallel_group()))
# Load mappings.
start_time = time.time()

View File

@ -4,8 +4,7 @@ import time
import numpy as np
import torch
from ascendspeed import mpu, print_rank_0
from ascendspeed.data.dataset_utils import create_masked_lm_predictions, pad_and_convert_to_numpy
from ascendspeed.core import parallel_state
from ascendspeed import get_args, get_tokenizer, print_rank_0, mpu
from deepspeed.accelerator import get_accelerator
@ -13,8 +12,8 @@ def get_one_epoch_dataloader(dataset, micro_batch_size=None):
"""Specifically one epoch to be used in an indexing job."""
args = get_args()
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
world_size = parallel_state.get_data_parallel_world_size()
rank = parallel_state.get_data_parallel_rank()
if micro_batch_size is None:
micro_batch_size = args.micro_batch_size
global_batch_size = micro_batch_size * world_size
@ -137,7 +136,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo
indexmap_filename += '.npy'
# Build the indexed mapping if not exist.
if mpu.get_data_parallel_rank() == 0 and \
if parallel_state.get_data_parallel_rank() == 0 and \
not os.path.isfile(indexmap_filename):
print(' > WARNING: could not find index map file {}, building '
'the indices on rank 0 ...'.format(indexmap_filename))
@ -178,9 +177,9 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo
# device_index=rank which is not the case for model
# parallel case
counts = get_accelerator().LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(counts, group=parallel_state.get_data_parallel_group())
assert counts[0].item() == torch.distributed.get_world_size(
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
# Load indexed dataset.
print_rank_0(' > loading indexed mapping from {}'.format(

View File

@ -7,7 +7,7 @@ import numpy as np
import torch
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import parallel_state
def detach(tensor):
@ -50,10 +50,10 @@ class OpenRetreivalDataStore(object):
def load_from_file(self):
"""Populate members from instance saved to file"""
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print("\n> Unpickling BlockData", flush=True)
state_dict = pickle.load(open(self.embedding_path, 'rb'))
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print(">> Finished unpickling BlockData\n", flush=True)
self.embed_data = state_dict['embed_data']
@ -137,7 +137,7 @@ class FaissMIPSIndex(object):
except ImportError:
raise Exception("Error: Please install faiss to use FaissMIPSIndex")
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print("\n> Building index", flush=True)
cpu_index = faiss.IndexFlatIP(self.embed_size)
@ -149,12 +149,12 @@ class FaissMIPSIndex(object):
config.useFloat16 = True
gpu_index = faiss.index_cpu_to_all_gpus(cpu_index, co=config)
self.mips_index = faiss.IndexIDMap(gpu_index)
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print(">> Initialized index on GPU", flush=True)
else:
# CPU index supports IDs so wrap with IDMap
self.mips_index = faiss.IndexIDMap(cpu_index)
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print(">> Initialized index on CPU", flush=True)
# if we were constructed with a BlockData, then automatically load it
@ -199,7 +199,7 @@ class FaissMIPSIndex(object):
self.mips_index.add_with_ids(embeds_arr, indices_arr)
if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
if parallel_state.is_unitialized() or parallel_state.get_data_parallel_rank() == 0:
print(">>> Finished adding block data to index", flush=True)
def search_mips_index(self, query_embeds, top_k, reconstruct=True):

View File

@ -3,7 +3,7 @@ import torch
import torch.distributed as dist
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.checkpointing import load_biencoder_checkpoint
from ascendspeed.data.orqa_wiki_dataset import get_open_retrieval_wiki_dataset
from ascendspeed.data.orqa_wiki_dataset import get_open_retrieval_batch
@ -35,8 +35,8 @@ class IndexBuilder(object):
self.batch_size = args.indexer_batch_size
self.load_attributes()
self.is_main_builder = mpu.get_data_parallel_rank() == 0
self.num_total_builders = mpu.get_data_parallel_world_size()
self.is_main_builder = parallel_state.get_data_parallel_rank() == 0
self.num_total_builders = parallel_state.get_data_parallel_world_size()
self.iteration = self.total_processed = 0
def load_attributes(self):

View File

@ -27,9 +27,10 @@ from ascendspeed import get_adlr_autoresume
from ascendspeed import get_args
from ascendspeed import get_tensorboard_writer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.global_vars import set_global_variables
from ascendspeed.mpu import (set_tensor_model_parallel_rank,
set_tensor_model_parallel_world_size)
from ascendspeed.core.parallel_state import (set_tensor_model_parallel_rank,
set_tensor_model_parallel_world_size)
def initialize_megatron(extra_args_provider=None, args_defaults=None,
@ -47,7 +48,7 @@ def initialize_megatron(extra_args_provider=None, args_defaults=None,
if not allow_no_cuda:
# Make sure cuda is available.
assert get_accelerator().is_available(), 'ascendspeed requires accelerator.'
assert get_accelerator().is_available(), 'AscendSpeed requires accelerator.'
# Parse args, build tokenizer, and set adlr-autoresume,
# tensorboard-writer, and timers.
@ -60,7 +61,7 @@ def initialize_megatron(extra_args_provider=None, args_defaults=None,
args = get_args()
# Pytorch distributed.
_initialize_distributed()
# Random seeds for reproducibility.
if args.rank == 0:
print('> setting random seeds to {} ...'.format(args.seed))
@ -68,13 +69,13 @@ def initialize_megatron(extra_args_provider=None, args_defaults=None,
args = get_args()
if args.lazy_mpu_init:
args.use_cpu_initialization=True
args.use_cpu_initialization = True
# delayed initialization of DDP-related stuff
# We only set basic DDP globals
set_tensor_model_parallel_world_size(args.tensor_model_parallel_size)
# and return function for external DDP manager
# to call when it has DDP initialized
set_tensor_model_parallel_rank(args.rank)
set_tensor_model_parallel_rank(args.rank)
return finish_mpu_init
else:
# MPU is the master. Complete initialization right away.
@ -82,7 +83,7 @@ def initialize_megatron(extra_args_provider=None, args_defaults=None,
# Initialize memory buffers.
_initialize_mem_buffs()
# Auto resume.
_init_autoresume()
@ -165,13 +166,13 @@ def _initialize_distributed():
else:
args.local_rank = device
get_accelerator().set_device(device) # only do so when device_count > 0
get_accelerator().set_device(device) # only do so when device_count > 0
# Call the init process
# 设置进程大组
init_method = 'tcp://'
master_ip = os.getenv('MASTER_ADDR', 'localhost') # 获取rank=0进程的ip
master_port = os.getenv('MASTER_PORT', '6000') # 获取rank=0进程的端口
master_ip = os.getenv('MASTER_ADDR', 'localhost') # 获取rank=0进程的ip
master_port = os.getenv('MASTER_PORT', '6000') # 获取rank=0进程的端口
init_method += master_ip + ':' + master_port
if args.deepspeed or args.ds_inference:
@ -186,12 +187,12 @@ def _initialize_distributed():
# data-parallel communicators.
# 2、制定DP/TP/PP分组策略设置进程子组subgroup
if device_count > 0:
if mpu.model_parallel_is_initialized():
if parallel_state.model_parallel_is_initialized():
print('model parallel is already initialized')
else:
mpu.initialize_model_parallel(args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size)
parallel_state.initialize_model_parallel(args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size)
# 设置DeepSpeed ZeRO-R对activation进行优化
if args.deepspeed and args.deepspeed_activation_checkpointing:
@ -215,7 +216,7 @@ def _set_random_seed(seed_):
if get_accelerator().device_count() == 0:
seed = seed_
else:
seed = seed_ + (100 * mpu.get_pipeline_model_parallel_rank())
seed = seed_ + (100 * parallel_state.get_pipeline_model_parallel_rank())
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
@ -243,12 +244,12 @@ def _initialize_mem_buffs():
if args.distribute_checkpointed_activations:
mpu.init_checkpointed_activations_memory_buffer()
def _is_rank_0():
"""Check whether it is rank 0. For AML, check if it is rank 0 of a node"""
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0 or (
'AZUREML_EXPERIMENT_ID' in os.environ and torch.distributed.get_rank() % get_accelerator().device_count() == 0
):
if torch.distributed.get_rank() == 0 or \
torch.distributed.get_rank() % get_accelerator().device_count() == 0:
return True
else:
return False

View File

@ -20,9 +20,9 @@ import torch
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from ascendspeed import get_args
from ascendspeed import mpu
from .module import MegatronModule
from deepspeed.accelerator import get_accelerator
from ascendspeed.core import parallel_state
class MemoryBuffer:
@ -191,9 +191,9 @@ class DistributedDataParallel(DistributedDataParallelBase):
if self._grad_buffers is not None:
for _, buffer_ in self._grad_buffers.items():
buffer_.data /= mpu.get_data_parallel_world_size()
buffer_.data /= parallel_state.get_data_parallel_world_size()
torch.distributed.all_reduce(
buffer_.data, group=mpu.get_data_parallel_group())
buffer_.data, group=parallel_state.get_data_parallel_group())
else:
# Otherwise, bucketize and all-reduce
buckets = {}
@ -211,9 +211,9 @@ class DistributedDataParallel(DistributedDataParallelBase):
bucket = buckets[tp]
grads = [param.grad.data for param in bucket]
coalesced = _flatten_dense_tensors(grads)
coalesced /= mpu.get_data_parallel_world_size()
coalesced /= parallel_state.get_data_parallel_world_size()
torch.distributed.all_reduce(
coalesced, group=mpu.get_data_parallel_group())
coalesced, group=parallel_state.get_data_parallel_group())
for buf, synced in zip(grads, _unflatten_dense_tensors(
coalesced, grads)):
buf.copy_(synced)

View File

@ -18,7 +18,6 @@
with some changes. """
import numbers
from ascendspeed.mpu.utils import make_viewless_tensor
import torch
from torch.nn.parameter import Parameter
from torch.nn import init

View File

@ -19,7 +19,7 @@ from functools import partial
import torch
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import tensor_parallel, parallel_state
from .module import MegatronModule, fp32_to_float16
from .enums import AttnMaskType
@ -58,9 +58,9 @@ def post_language_model_processing(lm_output, labels, logit_weights,
else:
if fp16_lm_cross_entropy:
assert output.dtype == torch.half
loss = mpu.vocab_parallel_cross_entropy(output, labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(output, labels)
else:
loss = mpu.vocab_parallel_cross_entropy(output.float(), labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(output.float(), labels)
return loss
@ -183,7 +183,7 @@ def get_cross_entropy(is_prefix: bool):
args = get_args()
losses = mpu.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
losses = tensor_parallel.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
if is_prefix:
micro_batch_size, sequence_length = loss_mask.shape
@ -318,9 +318,9 @@ class GPTModelPipe(PipelineModule,MegatronModule):
interval = 0
from deepspeed.runtime.pipe.topology import PipeModelDataParallelTopology
topo = PipeModelDataParallelTopology(num_pp=mpu.get_pipeline_model_parallel_world_size(),
num_mp=mpu.get_tensor_model_parallel_world_size(),
num_dp=mpu.get_data_parallel_world_size())
topo = PipeModelDataParallelTopology(num_pp=parallel_state.get_pipeline_model_parallel_world_size(),
num_mp=parallel_state.get_tensor_model_parallel_world_size(),
num_dp=parallel_state.get_data_parallel_world_size())
# here one can extend the regex to include more layers to be counted towards partitioning,
# e.g. 'type:transformer|embedding' will add up all the transformer blocks and also the first

View File

@ -27,7 +27,6 @@ from ascendspeed.model.utils import get_linear_layer
from ascendspeed.model.utils import init_method_normal, scaled_init_method_normal
from ascendspeed.mpu.mappings import gather_from_sequence_parallel_region
from ascendspeed.mpu.mappings import scatter_to_sequence_parallel_region
from ascendspeed.mpu.initialize import get_global_memory_buffer
def parallel_lm_logits(input_, word_embeddings_weight, parallel_output,

View File

@ -28,6 +28,7 @@ from deepspeed.pipe import PipelineModule, LayerSpec
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import tensor_parallel, parallel_state, utils
from ascendspeed.model.module import MegatronModule, float16_to_fp32, fp32_to_float16
from ascendspeed.model.enums import AttnMaskType, AttnType
from ascendspeed.model.utils import init_method_normal, scaled_init_method_normal, attention_mask_func
@ -315,12 +316,12 @@ class Llama2ParallelAttention(MegatronModule):
projection_size = args.kv_channels * args.num_attention_heads
# Per attention head and per partition values.
world_size = mpu.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = mpu.divide(projection_size,
world_size = parallel_state.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = utils.divide(projection_size,
world_size)
self.hidden_size_per_attention_head = mpu.divide(
self.hidden_size_per_attention_head = utils.divide(
projection_size, args.num_attention_heads)
self.num_attention_heads_per_partition = mpu.divide(
self.num_attention_heads_per_partition = utils.divide(
args.num_attention_heads, world_size)
# Strided linear layer.
@ -385,7 +386,7 @@ class Llama2ParallelAttention(MegatronModule):
# [sq, b, 3 * h] --> 3 [sq, b, h]
(query_layer,
key_layer,
value_layer) = mpu.split_tensor_along_last_dim(mixed_x_layer, 3)
value_layer) = utils.split_tensor_along_last_dim(mixed_x_layer, 3)
# ==================================
# Rotary Position Embedding
@ -596,9 +597,9 @@ class Llama2ParallelTransformer(MegatronModule):
self.checkpoint_num_layers = args.checkpoint_num_layers
# Number of layers.
assert args.num_layers % mpu.get_pipeline_model_parallel_world_size() == 0, \
assert args.num_layers % parallel_state.get_pipeline_model_parallel_world_size() == 0, \
'num_layers must be divisible by pipeline_model_parallel_size'
self.num_layers = args.num_layers // mpu.get_pipeline_model_parallel_world_size()
self.num_layers = args.num_layers // parallel_state.get_pipeline_model_parallel_world_size()
# Transformer layers.
def build_layer(layer_number):
@ -622,12 +623,12 @@ class Llama2ParallelTransformer(MegatronModule):
# layers to stages like (each list is a model chunk):
# Stage 0: [0, 1] [4, 5]
# Stage 1: [2, 3] [6, 7]
offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
offset = parallel_state.get_virtual_pipeline_model_parallel_rank() * (
args.num_layers // args.virtual_pipeline_model_parallel_size) + \
(mpu.get_pipeline_model_parallel_rank() * self.num_layers)
(parallel_state.get_pipeline_model_parallel_rank() * self.num_layers)
else:
# Each stage gets a contiguous set of layers.
offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
offset = parallel_state.get_pipeline_model_parallel_rank() * self.num_layers
self.layers = []
# Build the layers
@ -748,7 +749,7 @@ def CrossEntropy(output, labels):
labels, loss_mask = labels[0], labels[1]
args = get_args()
losses = mpu.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
losses = tensor_parallel.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
loss_mask = loss_mask.view(-1)
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
return loss
@ -813,9 +814,9 @@ class Llama2ModelPipe(PipelineModule, MegatronModule):
interval = 0
from deepspeed.runtime.pipe.topology import PipeModelDataParallelTopology
topo = PipeModelDataParallelTopology(num_pp=mpu.get_pipeline_model_parallel_world_size(),
num_mp=mpu.get_tensor_model_parallel_world_size(),
num_dp=mpu.get_data_parallel_world_size())
topo = PipeModelDataParallelTopology(num_pp=parallel_state.get_pipeline_model_parallel_world_size(),
num_mp=parallel_state.get_tensor_model_parallel_world_size(),
num_dp=parallel_state.get_data_parallel_world_size())
super().__init__(layers=self.specs,
loss_fn=CrossEntropy,
@ -898,9 +899,9 @@ class Llama2Model(MegatronModule):
else:
if self.fp16_lm_cross_entropy:
assert hidden_states.dtype == torch.half
loss = mpu.vocab_parallel_cross_entropy(hidden_states, labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(hidden_states, labels)
else:
loss = mpu.vocab_parallel_cross_entropy(hidden_states.float(), labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(hidden_states.float(), labels)
return loss
return hidden_states

View File

@ -26,6 +26,7 @@ import torch.nn.functional as F
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import tensor_parallel, parallel_state, utils
from ascendspeed.model.module import MegatronModule, float16_to_fp32, fp32_to_float16
from ascendspeed.model.enums import AttnMaskType, LayerType, AttnType
from ascendspeed.model.utils import get_linear_layer, init_method_normal, scaled_init_method_normal, attention_mask_func, \
@ -318,12 +319,12 @@ class LlamaParallelAttention(MegatronModule):
projection_size = args.kv_channels * args.num_attention_heads
# Per attention head and per partition values.
world_size = mpu.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = mpu.divide(projection_size,
world_size = parallel_state.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = utils.divide(projection_size,
world_size)
self.hidden_size_per_attention_head = mpu.divide(
self.hidden_size_per_attention_head = utils.divide(
projection_size, args.num_attention_heads)
self.num_attention_heads_per_partition = mpu.divide(
self.num_attention_heads_per_partition = utils.divide(
args.num_attention_heads, world_size)
# Strided linear layer.
@ -388,7 +389,7 @@ class LlamaParallelAttention(MegatronModule):
# [sq, b, 3 * h] --> 3 [sq, b, h]
(query_layer,
key_layer,
value_layer) = mpu.split_tensor_along_last_dim(mixed_x_layer, 3)
value_layer) = utils.split_tensor_along_last_dim(mixed_x_layer, 3)
# ==================================
# Rotary Position Embedding
@ -599,9 +600,9 @@ class LlamaParallelTransformer(MegatronModule):
self.checkpoint_num_layers = args.checkpoint_num_layers
# Number of layers.
assert args.num_layers % mpu.get_pipeline_model_parallel_world_size() == 0, \
assert args.num_layers % parallel_state.get_pipeline_model_parallel_world_size() == 0, \
'num_layers must be divisible by pipeline_model_parallel_size'
self.num_layers = args.num_layers // mpu.get_pipeline_model_parallel_world_size()
self.num_layers = args.num_layers // parallel_state.get_pipeline_model_parallel_world_size()
# Transformer layers.
def build_layer(layer_number):
@ -625,12 +626,12 @@ class LlamaParallelTransformer(MegatronModule):
# layers to stages like (each list is a model chunk):
# Stage 0: [0, 1] [4, 5]
# Stage 1: [2, 3] [6, 7]
offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
offset = parallel_state.get_virtual_pipeline_model_parallel_rank() * (
args.num_layers // args.virtual_pipeline_model_parallel_size) + \
(mpu.get_pipeline_model_parallel_rank() * self.num_layers)
(parallel_state.get_pipeline_model_parallel_rank() * self.num_layers)
else:
# Each stage gets a contiguous set of layers.
offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
offset = parallel_state.get_pipeline_model_parallel_rank() * self.num_layers
self.layers = []
# Build the layers
@ -751,7 +752,7 @@ def CrossEntropy(output, labels):
labels, loss_mask = labels[0], labels[1]
args = get_args()
losses = mpu.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
losses = tensor_parallel.vocab_parallel_cross_entropy(output.contiguous().float(), labels)
loss_mask = loss_mask.view(-1)
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
return loss
@ -816,9 +817,9 @@ class LlamaModelPipe(PipelineModule, MegatronModule):
interval = 0
from deepspeed.runtime.pipe.topology import PipeModelDataParallelTopology
topo = PipeModelDataParallelTopology(num_pp=mpu.get_pipeline_model_parallel_world_size(),
num_mp=mpu.get_tensor_model_parallel_world_size(),
num_dp=mpu.get_data_parallel_world_size())
topo = PipeModelDataParallelTopology(num_pp=parallel_state.get_pipeline_model_parallel_world_size(),
num_mp=parallel_state.get_tensor_model_parallel_world_size(),
num_dp=parallel_state.get_data_parallel_world_size())
super().__init__(layers=self.specs,
loss_fn=CrossEntropy,
@ -901,9 +902,9 @@ class LlamaModel(MegatronModule):
else:
if self.fp16_lm_cross_entropy:
assert hidden_states.dtype == torch.half
loss = mpu.vocab_parallel_cross_entropy(hidden_states, labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(hidden_states, labels)
else:
loss = mpu.vocab_parallel_cross_entropy(hidden_states.float(), labels)
loss = tensor_parallel.vocab_parallel_cross_entropy(hidden_states.float(), labels)
return loss
return hidden_states

View File

@ -21,6 +21,7 @@ from deepspeed.accelerator import get_accelerator
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import parallel_state
_FLOAT_TYPES = (torch.FloatTensor, get_accelerator().FloatTensor)
@ -50,9 +51,9 @@ class MegatronModule(torch.nn.Module):
def word_embeddings_weight(self):
if mpu.is_pipeline_first_stage(ignore_virtual=True):
if parallel_state.is_pipeline_first_stage(ignore_virtual=True):
return self.language_model.embedding.word_embeddings.weight
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
if not self.share_word_embeddings:
raise Exception('word_embeddings_weight() called for last '
'stage, but share_word_embeddings is false')
@ -85,8 +86,8 @@ class MegatronModule(torch.nn.Module):
# 3. In the training loop, before an all-reduce between the grads of
# the two word_embeddings layers to ensure that every applied weight
# update is the same on both stages.
if mpu.is_pipeline_last_stage():
assert not mpu.is_pipeline_first_stage()
if parallel_state.is_pipeline_last_stage():
assert not parallel_state.is_pipeline_first_stage()
self._word_embeddings_for_head_key = 'word_embeddings_for_head'
# set word_embeddings weights to 0 here, then copy first
# stage's weights using all_reduce below.
@ -99,9 +100,9 @@ class MegatronModule(torch.nn.Module):
# Ensure that first and last stages have the same initial parameter
# values.
if torch.distributed.is_initialized():
if mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_first_stage() or parallel_state.is_pipeline_last_stage():
torch.distributed.all_reduce(self.word_embeddings_weight().data,
group=mpu.get_embedding_group())
group=parallel_state.get_embedding_group())
else:
print("WARNING! Distributed processes aren't initialized, so "
"word embeddings in the last layer are not initialized. "
@ -164,10 +165,10 @@ class Float16Module(MegatronModule):
def forward(self, *inputs, **kwargs):
if mpu.is_pipeline_first_stage():
if parallel_state.is_pipeline_first_stage():
inputs = fp32_to_float16(inputs, self.float16_convertor)
outputs = self.module(*inputs, **kwargs)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
outputs = float16_to_fp32(outputs)
return outputs

View File

@ -1,6 +1,5 @@
# coding=utf-8
# Copyright (c) 2023, HUAWEI CORPORATION. All rights reserved.
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Extracted from: https://github.com/EleutherAI/gpt-neox
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -27,6 +27,7 @@ from deepspeed.moe.layer import MoE
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import utils, parallel_state
from ascendspeed.enums import PositionEmbeddingType
from ascendspeed.model import LayerNorm
from ascendspeed.model.enums import AttnMaskType, LayerType, AttnType
@ -149,12 +150,12 @@ class ParallelAttention(MegatronModule):
projection_size = args.kv_channels * args.num_attention_heads
# Per attention head and per partition values.
world_size = mpu.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = mpu.divide(projection_size,
world_size = parallel_state.get_tensor_model_parallel_world_size()
self.hidden_size_per_partition = utils.divide(projection_size,
world_size)
self.hidden_size_per_attention_head = mpu.divide(
self.hidden_size_per_attention_head = utils.divide(
projection_size, args.num_attention_heads)
self.num_attention_heads_per_partition = mpu.divide(
self.num_attention_heads_per_partition = utils.divide(
args.num_attention_heads, world_size)
# Strided linear layer.
@ -242,7 +243,7 @@ class ParallelAttention(MegatronModule):
# [sq, b, np, 3 * hn] --> 3 [sq, b, np, hn]
(query_layer,
key_layer,
value_layer) = mpu.split_tensor_along_last_dim(mixed_x_layer, 3)
value_layer) = utils.split_tensor_along_last_dim(mixed_x_layer, 3)
else:
# Attention heads [sk, b, h] --> [sk, b, (np * 2 * hn)]
mixed_kv_layer, _ = self.key_value(encoder_output)
@ -255,7 +256,7 @@ class ParallelAttention(MegatronModule):
# [sk, b, np, 2 * hn] --> 2 [sk, b, np, hn]
(key_layer,
value_layer) = mpu.split_tensor_along_last_dim(mixed_kv_layer, 2)
value_layer) = utils.split_tensor_along_last_dim(mixed_kv_layer, 2)
# Attention head [sq, b, h] --> [sq, b, hp]
query_layer, _ = self.query(hidden_states)
@ -630,8 +631,8 @@ class ParallelTransformerLayer(MegatronModule):
num_attention_heads, -1, -1)
# Select the part of the tensor that corresponds to our tensor parallel index.
tp_world_size = mpu.get_tensor_model_parallel_world_size()
tp_index = mpu.get_tensor_model_parallel_rank()
tp_world_size = parallel_state.get_tensor_model_parallel_world_size()
tp_index = parallel_state.get_tensor_model_parallel_rank()
alibi = alibi.reshape((tp_world_size, -1, *alibi.shape[1:]))[tp_index]
alibi = alibi.repeat(batch_size, 1, 1)
@ -699,9 +700,9 @@ class ParallelTransformer(MegatronModule):
self.checkpoint_num_layers = args.checkpoint_num_layers
# Number of layers.
assert args.num_layers % mpu.get_pipeline_model_parallel_world_size() == 0, \
assert args.num_layers % parallel_state.get_pipeline_model_parallel_world_size() == 0, \
'num_layers must be divisible by pipeline_model_parallel_size'
self.num_layers = args.num_layers // mpu.get_pipeline_model_parallel_world_size()
self.num_layers = args.num_layers // parallel_state.get_pipeline_model_parallel_world_size()
# Transformer layers.
def build_layer(layer_number, n_e):
@ -728,12 +729,12 @@ class ParallelTransformer(MegatronModule):
# layers to stages like (each list is a model chunk):
# Stage 0: [0, 1] [4, 5]
# Stage 1: [2, 3] [6, 7]
offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
offset = parallel_state.get_virtual_pipeline_model_parallel_rank() * (
args.num_layers // args.virtual_pipeline_model_parallel_size) + \
(mpu.get_pipeline_model_parallel_rank() * self.num_layers)
(parallel_state.get_pipeline_model_parallel_rank() * self.num_layers)
else:
# Each stage gets a contiguous set of layers.
offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
offset = parallel_state.get_pipeline_model_parallel_rank() * self.num_layers
assert len(num_experts) == 1 or len(num_experts) == args.num_layers // args.expert_interval, \
'num_experts must be either a single value or a list of the same length as the number of MoE layers'

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,35 +15,8 @@
"""Model parallel utility interface."""
from .cross_entropy import vocab_parallel_cross_entropy
from .data import broadcast_data
from .initialize import is_unitialized
from .initialize import destroy_model_parallel
from .initialize import get_data_parallel_group
from .initialize import get_data_parallel_rank
from .initialize import get_data_parallel_world_size
from .initialize import get_embedding_group
from .initialize import get_model_parallel_group
from .initialize import get_tensor_model_parallel_group
from .initialize import get_pipeline_model_parallel_group
from .initialize import get_tensor_model_parallel_rank, set_tensor_model_parallel_rank
from .initialize import get_pipeline_model_parallel_rank, set_pipeline_model_parallel_rank
from .initialize import is_pipeline_first_stage, is_pipeline_last_stage
from .initialize import get_tensor_model_parallel_src_rank
from .initialize import get_pipeline_model_parallel_first_rank
from .initialize import get_pipeline_model_parallel_last_rank
from .initialize import get_pipeline_model_parallel_next_rank
from .initialize import get_pipeline_model_parallel_prev_rank
from .initialize import get_tensor_model_parallel_world_size, set_tensor_model_parallel_world_size
from .initialize import get_pipeline_model_parallel_world_size, set_pipeline_model_parallel_world_size
from .initialize import get_virtual_pipeline_model_parallel_rank, set_virtual_pipeline_model_parallel_rank
from .initialize import initialize_model_parallel
from .initialize import model_parallel_is_initialized
from .initialize import get_model_parallel_world_size, get_model_parallel_rank
from .initialize import is_pipeline_stage_after_split
from .layers import ColumnParallelLinear
from .layers import RowParallelLinear
from .layers import VocabParallelEmbedding
@ -64,5 +37,5 @@ from .random import reset_checkpointed_activations_memory_buffer
from .random import gather_split_1d_tensor
from .random import split_tensor_into_1d_equal_chunks
from .utils import divide
from .utils import split_tensor_along_last_dim
from ascendspeed.core.parallel_state import *

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,9 +15,9 @@
import torch
from .initialize import get_tensor_model_parallel_group
from .initialize import get_tensor_model_parallel_rank
from .initialize import get_tensor_model_parallel_src_rank
from ascendspeed.core.parallel_state import get_tensor_model_parallel_group
from ascendspeed.core.parallel_state import get_tensor_model_parallel_rank
from ascendspeed.core.parallel_state import get_tensor_model_parallel_src_rank
from deepspeed.accelerator import get_accelerator
_MAX_DATA_DIM = 5

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -27,18 +27,21 @@ import torch.nn.init as init
from torch.nn.parameter import Parameter
from deepspeed.accelerator import get_accelerator
from ascendspeed import get_args, mpu
from ascendspeed.mpu.initialize import get_tensor_model_parallel_rank
from ascendspeed.mpu.initialize import get_tensor_model_parallel_group
from ascendspeed.mpu.initialize import get_tensor_model_parallel_world_size
from ascendspeed.mpu.initialize import get_global_memory_buffer
from ascendspeed import get_args
from ascendspeed.core import parallel_state
from ascendspeed.core.parallel_state import get_tensor_model_parallel_rank
from ascendspeed.core.parallel_state import get_tensor_model_parallel_group
from ascendspeed.core.parallel_state import get_tensor_model_parallel_world_size
from ascendspeed.core.parallel_state import get_global_memory_buffer
from ascendspeed.mpu.mappings import copy_to_tensor_model_parallel_region
from ascendspeed.mpu.mappings import gather_from_tensor_model_parallel_region
from ascendspeed.mpu.mappings import reduce_from_tensor_model_parallel_region
from ascendspeed.mpu.mappings import scatter_to_tensor_model_parallel_region
from ascendspeed.mpu.mappings import reduce_scatter_to_sequence_parallel_region
from ascendspeed.mpu.random import get_cuda_rng_tracker
from ascendspeed.mpu.utils import divide, split_tensor_along_last_dim, VocabUtility
from ascendspeed.core.utils import divide
from ascendspeed.core.tensor_parallel import VocabUtility
from ascendspeed.model.fused_layer_norm import MixedFusedLayerNorm as LayerNorm
@ -169,7 +172,7 @@ class VocabParallelEmbedding(torch.nn.Module):
# Allocate weights and initialize.
args = get_args()
if mpu.is_pipeline_first_stage() and args.embed_layernorm:
if parallel_state.is_pipeline_first_stage() and args.embed_layernorm:
self.norm = LayerNorm(embedding_dim)
if args.use_cpu_initialization:

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,8 +15,10 @@
import torch
from .initialize import get_tensor_model_parallel_group, get_tensor_model_parallel_world_size, get_tensor_model_parallel_rank
from .utils import split_tensor_along_last_dim
from ascendspeed.core.parallel_state import get_tensor_model_parallel_group
from ascendspeed.core.parallel_state import get_tensor_model_parallel_world_size
from ascendspeed.core.parallel_state import get_tensor_model_parallel_rank
from ascendspeed.core.utils import split_tensor_along_last_dim
def _reduce(input_):

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -27,10 +27,10 @@ from torch.utils.checkpoint import detach_variable
from ascendspeed import get_args
from ascendspeed.memory import allocate_mem_buff
from .initialize import get_data_parallel_rank
from .initialize import get_tensor_model_parallel_group
from .initialize import get_tensor_model_parallel_rank
from .initialize import get_tensor_model_parallel_world_size
from ascendspeed.core.parallel_state import get_data_parallel_rank
from ascendspeed.core.parallel_state import get_tensor_model_parallel_group
from ascendspeed.core.parallel_state import get_tensor_model_parallel_rank
from ascendspeed.core.parallel_state import get_tensor_model_parallel_world_size
# Default name for the model parallel rng tracker.

View File

@ -29,10 +29,9 @@ if get_accelerator().device_name() == 'cuda':
import amp_C
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.model.module import param_is_not_shared
from ascendspeed.mpu.layers import param_is_not_tensor_parallel_duplicate
from deepspeed.accelerator import get_accelerator
def clip_grad_norm_fp32(parameters, max_norm, norm_type=2):
"""Clips gradient norm of an iterable of parameters whose gradients
@ -87,7 +86,7 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2):
# Take max across all model-parallel GPUs.
torch.distributed.all_reduce(total_norm_cuda,
op=torch.distributed.ReduceOp.MAX,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
total_norm = total_norm_cuda[0].item()
else:
for grad in grads_for_norm:
@ -97,7 +96,7 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2):
# Sum across all model-parallel GPUs.
torch.distributed.all_reduce(total_norm,
op=torch.distributed.ReduceOp.SUM,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
total_norm = total_norm.item() ** (1.0 / norm_type)
# Scale.
@ -131,7 +130,7 @@ def count_zeros_fp32(parameters):
# Sum across all model-parallel GPUs.
torch.distributed.all_reduce(total_num_zeros,
op=torch.distributed.ReduceOp.SUM,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
total_num_zeros = total_num_zeros.item()
return total_num_zeros

View File

@ -1,5 +1,5 @@
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020, HUAWEI CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import torch
from ascendspeed import get_timers
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed import print_rank_0
from deepspeed.accelerator import get_accelerator
from .clip_grads import clip_grad_norm_fp32, count_zeros_fp32
@ -343,7 +344,7 @@ class Float16OptimizerWithFloat16Params(MegatronOptimizer):
# Update across all model parallel instances.
torch.distributed.all_reduce(self.found_inf,
op=torch.distributed.ReduceOp.MAX,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
# Check for nan.
found_inf_flag = (self.found_inf.item() > 0)

View File

@ -19,6 +19,7 @@ import torch
from deepspeed.accelerator import get_accelerator
from ascendspeed import get_args
from ascendspeed import mpu
from ascendspeed.core import parallel_state
def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
@ -58,12 +59,12 @@ def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
else (args.seq_length, args.micro_batch_size, args.hidden_size)
if args.sequence_parallel:
seq_length = args.seq_length // mpu.get_tensor_model_parallel_world_size()
seq_length = args.seq_length // parallel_state.get_tensor_model_parallel_world_size()
tensor_shape = (seq_length, args.micro_batch_size, args.hidden_size)
if args.scatter_gather_tensors_in_pipeline and not args.sequence_parallel:
tensor_chunk_shape = reduce(operator.mul, tensor_shape, 1) // \
mpu.get_tensor_model_parallel_world_size()
parallel_state.get_tensor_model_parallel_world_size()
else:
tensor_chunk_shape = tensor_shape
dtype = args.params_dtype
@ -94,18 +95,18 @@ def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
tensor_recv_prev=tensor_recv_prev,
tensor_send_next=tensor_send_next,
tensor_recv_next=tensor_recv_next,
group=mpu.get_pipeline_model_parallel_group())
group=parallel_state.get_pipeline_model_parallel_group())
else:
ops = []
if tensor_send_prev is not None:
send_prev_op = torch.distributed.P2POp(
torch.distributed.isend, tensor_send_prev,
mpu.get_pipeline_model_parallel_prev_rank())
parallel_state.get_pipeline_model_parallel_prev_rank())
ops.append(send_prev_op)
if tensor_recv_prev is not None:
recv_prev_op = torch.distributed.P2POp(
torch.distributed.irecv, tensor_recv_prev,
mpu.get_pipeline_model_parallel_prev_rank())
parallel_state.get_pipeline_model_parallel_prev_rank())
ops.append(recv_prev_op)
if args.num_layers_per_virtual_pipeline_stage is None:
@ -113,28 +114,28 @@ def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
if tensor_recv_next is not None:
recv_next_op = torch.distributed.P2POp(
torch.distributed.irecv, tensor_recv_next,
mpu.get_pipeline_model_parallel_next_rank())
parallel_state.get_pipeline_model_parallel_next_rank())
ops.append(recv_next_op)
if tensor_send_next is not None:
send_next_op = torch.distributed.P2POp(
torch.distributed.isend, tensor_send_next,
mpu.get_pipeline_model_parallel_next_rank())
parallel_state.get_pipeline_model_parallel_next_rank())
ops.append(send_next_op)
else:
# vp
if tensor_send_next is not None:
send_next_op = torch.distributed.P2POp(
torch.distributed.isend, tensor_send_next,
mpu.get_pipeline_model_parallel_next_rank())
parallel_state.get_pipeline_model_parallel_next_rank())
ops.append(send_next_op)
if tensor_recv_next is not None:
recv_next_op = torch.distributed.P2POp(
torch.distributed.irecv, tensor_recv_next,
mpu.get_pipeline_model_parallel_next_rank())
parallel_state.get_pipeline_model_parallel_next_rank())
ops.append(recv_next_op)
if (args.num_layers_per_virtual_pipeline_stage is not None) \
and (mpu.get_pipeline_model_parallel_rank() % 2 == 1):
and (parallel_state.get_pipeline_model_parallel_rank() % 2 == 1):
ops.reverse()
if len(ops) > 0:
@ -159,7 +160,7 @@ def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
def recv_forward(timers=None, recv_tensor_shape=None):
"""Receive tensor from previous rank in pipeline (forward receive)."""
if mpu.is_pipeline_first_stage():
if parallel_state.is_pipeline_first_stage():
input_tensor = None
else:
if timers is not None:
@ -177,7 +178,7 @@ def recv_forward(timers=None, recv_tensor_shape=None):
def recv_backward(timers=None, recv_tensor_shape=None):
"""Receive tensor from next rank in pipeline (backward receive)."""
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
output_tensor_grad = None
else:
if timers is not None:
@ -195,7 +196,7 @@ def recv_backward(timers=None, recv_tensor_shape=None):
def send_forward(output_tensor, timers=None):
"""Send tensor to next rank in pipeline (forward send)."""
if not mpu.is_pipeline_last_stage():
if not parallel_state.is_pipeline_last_stage():
if timers is not None:
timers('forward-send', log_level=2).start()
_communicate(
@ -209,7 +210,7 @@ def send_forward(output_tensor, timers=None):
def send_backward(input_tensor_grad, timers=None):
"""Send tensor to previous rank in pipeline (backward send)."""
if not mpu.is_pipeline_first_stage():
if not parallel_state.is_pipeline_first_stage():
if timers is not None:
timers('backward-send', log_level=2).start()
_communicate(
@ -223,7 +224,7 @@ def send_backward(input_tensor_grad, timers=None):
def send_forward_recv_backward(output_tensor, timers=None, recv_tensor_shape=None):
"""Batched send and recv with next rank in pipeline."""
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
output_tensor_grad = None
else:
if timers is not None:
@ -241,7 +242,7 @@ def send_forward_recv_backward(output_tensor, timers=None, recv_tensor_shape=Non
def send_backward_recv_forward(input_tensor_grad, timers=None, recv_tensor_shape=None):
"""Batched send and recv with previous rank in pipeline."""
if mpu.is_pipeline_first_stage():
if parallel_state.is_pipeline_first_stage():
input_tensor = None
else:
if timers is not None:

View File

@ -21,9 +21,9 @@ from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from ascendspeed import get_args
from ascendspeed import get_num_microbatches
from ascendspeed import get_timers
from ascendspeed import mpu
from ascendspeed import p2p_communication
from ascendspeed import print_rank_0
from ascendspeed.core import parallel_state
from ascendspeed.utils import unwrap_model
from ascendspeed.model import DistributedDataParallel as LocalDDP
from ascendspeed.model import Float16Module
@ -49,7 +49,7 @@ def set_npu_overflow_flag():
def get_forward_backward_func():
args = get_args()
if mpu.get_pipeline_model_parallel_world_size() > 1:
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
if args.virtual_pipeline_model_parallel_size is not None:
forward_backward_func = forward_backward_pipelining_with_interleaving
else:
@ -84,7 +84,7 @@ def forward_step(forward_step_func, data_iterator, model, input_tensor, losses_r
# Changing this API requires changing it in all models/tasks. Instead,
# it's recommended to use args to pass additional arguments.
output_tensor, loss_func = forward_step_func(data_iterator, model)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
output_tensor = loss_func(output_tensor)
loss, loss_reduced = output_tensor
if not args.no_pipeline_parallel:
@ -195,8 +195,8 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
if not forward_only:
output_tensor_grads = [[] for _ in range(len(model))]
pipeline_parallel_size = mpu.get_pipeline_model_parallel_world_size()
pipeline_parallel_rank = mpu.get_pipeline_model_parallel_rank()
pipeline_parallel_size = parallel_state.get_pipeline_model_parallel_world_size()
pipeline_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
# Compute number of warmup and remaining microbatches.
num_model_chunks = len(model)
@ -237,9 +237,9 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
(run set_virtual_pipeline_model_parallel_rank() before calling
forward_step())."""
model_chunk_id = get_model_chunk_id(microbatch_id, forward=True)
mpu.set_virtual_pipeline_model_parallel_rank(model_chunk_id)
parallel_state.set_virtual_pipeline_model_parallel_rank(model_chunk_id)
if mpu.is_pipeline_first_stage():
if parallel_state.is_pipeline_first_stage():
if len(input_tensors[model_chunk_id]) == \
len(output_tensors[model_chunk_id]):
input_tensors[model_chunk_id].append(None)
@ -257,9 +257,9 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
(run set_virtual_pipeline_model_parallel_rank() before calling
backward_step())."""
model_chunk_id = get_model_chunk_id(microbatch_id, forward=False)
mpu.set_virtual_pipeline_model_parallel_rank(model_chunk_id)
parallel_state.set_virtual_pipeline_model_parallel_rank(model_chunk_id)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
if len(output_tensor_grads[model_chunk_id]) == 0:
output_tensor_grads[model_chunk_id].append(None)
input_tensor = input_tensors[model_chunk_id].pop(0)
@ -274,7 +274,7 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
return input_tensor_grad
# Run warmup forward passes.
mpu.set_virtual_pipeline_model_parallel_rank(0)
parallel_state.set_virtual_pipeline_model_parallel_rank(0)
input_tensors[0].append(
p2p_communication.recv_forward(timers))
for k in range(num_warmup_microbatches):
@ -283,14 +283,14 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
# Determine if tensor should be received from previous stage.
next_forward_model_chunk_id = get_model_chunk_id(k+1, forward=True)
recv_prev = True
if mpu.is_pipeline_first_stage(ignore_virtual=True):
if parallel_state.is_pipeline_first_stage(ignore_virtual=True):
if next_forward_model_chunk_id == 0:
recv_prev = False
if k == (num_microbatches - 1):
recv_prev = False
# Don't send tensor downstream if on last stage.
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
output_tensor = None
# Send and receive tensors as appropriate (send tensors computed
@ -299,7 +299,7 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
not all_warmup_microbatches:
input_tensor_grad = None
recv_next = True
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
recv_next = False
input_tensor, output_tensor_grad = \
p2p_communication.send_forward_backward_recv_forward_backward(
@ -329,19 +329,19 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
# Determine if current stage has anything to send in either direction,
# otherwise set tensor to None.
forward_model_chunk_id = get_model_chunk_id(forward_k, forward=True)
mpu.set_virtual_pipeline_model_parallel_rank(forward_model_chunk_id)
if mpu.is_pipeline_last_stage():
parallel_state.set_virtual_pipeline_model_parallel_rank(forward_model_chunk_id)
if parallel_state.is_pipeline_last_stage():
output_tensor = None
backward_model_chunk_id = get_model_chunk_id(backward_k, forward=False)
mpu.set_virtual_pipeline_model_parallel_rank(backward_model_chunk_id)
if mpu.is_pipeline_first_stage():
parallel_state.set_virtual_pipeline_model_parallel_rank(backward_model_chunk_id)
if parallel_state.is_pipeline_first_stage():
input_tensor_grad = None
# Determine if peers are sending, and where in data structure to put
# received tensors.
recv_prev = True
if mpu.is_pipeline_first_stage(ignore_virtual=True):
if parallel_state.is_pipeline_first_stage(ignore_virtual=True):
# First stage is ahead of last stage by (pipeline_parallel_size - 1).
next_forward_model_chunk_id = get_model_chunk_id(
forward_k - (pipeline_parallel_size - 1), forward=True)
@ -353,7 +353,7 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
forward=True)
recv_next = True
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
# Last stage is ahead of first stage by (pipeline_parallel_size - 1).
next_backward_model_chunk_id = get_model_chunk_id(
backward_k - (pipeline_parallel_size - 1), forward=False)
@ -393,7 +393,7 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat
input_tensor_grad = backward_step_helper(k)
next_backward_model_chunk_id = get_model_chunk_id(k+1, forward=False)
recv_next = True
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
if next_backward_model_chunk_id == (num_model_chunks - 1):
recv_next = False
if k == (num_microbatches - 1):
@ -421,8 +421,8 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite
# Compute number of warmup microbatches.
num_microbatches = get_num_microbatches()
num_warmup_microbatches = \
(mpu.get_pipeline_model_parallel_world_size() -
mpu.get_pipeline_model_parallel_rank() - 1)
(parallel_state.get_pipeline_model_parallel_world_size() -
parallel_state.get_pipeline_model_parallel_rank() - 1)
num_warmup_microbatches = min(
num_warmup_microbatches,
num_microbatches)
@ -530,8 +530,8 @@ def optimized_forward_backward_pipelining(forward_step_func, data_iterator,
# Compute number of warmup microbatches.
num_microbatches = get_num_microbatches()
num_warmup_microbatches = \
(mpu.get_pipeline_model_parallel_world_size() -
mpu.get_pipeline_model_parallel_rank() - 1)
(parallel_state.get_pipeline_model_parallel_world_size() -
parallel_state.get_pipeline_model_parallel_rank() - 1)
num_warmup_microbatches = min(
num_warmup_microbatches,
num_microbatches)

View File

@ -27,7 +27,7 @@ from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from ascendspeed import get_args
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.utils import get_ltor_masks_and_position_ids, unwrap_model
from ascendspeed.p2p_communication import recv_forward, send_forward
from ascendspeed.model import DistributedDataParallel as LocalDDP
@ -93,7 +93,7 @@ def generate_samples_input_from_file(model):
# Read the sample file and open the output file.
assert args.sample_input_file is not None, \
'sample input file is not provided.'
if mpu.is_pipeline_first_stage() and mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_first_stage() and parallel_state.get_tensor_model_parallel_rank() == 0:
fname = open(args.sample_input_file, "r")
all_raw_text = fname.readlines()
input_count = len(all_raw_text)
@ -113,8 +113,8 @@ def generate_samples_input_from_file(model):
terminate_runs = 0
raw_text_len = 0
if mpu.is_pipeline_first_stage() \
and mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_first_stage() \
and parallel_state.get_tensor_model_parallel_rank() == 0:
raw_text = all_raw_text[input_pos]
input_pos += 1
if input_pos == input_count:
@ -139,7 +139,7 @@ def generate_samples_input_from_file(model):
input_info = [terminate_runs, raw_text_len, context_length]
input_info_tensor = get_accelerator().LongTensor(input_info)
torch.distributed.all_reduce(input_info_tensor,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
terminate_runs = input_info_tensor[0].item()
raw_text_len = input_info_tensor[1].item()
context_length = input_info_tensor[2].item()
@ -149,16 +149,16 @@ def generate_samples_input_from_file(model):
# For pipeline parallel we send context tokens to other stages
# so they get the lengths correct
if mpu.get_tensor_model_parallel_rank() == 0 \
if parallel_state.get_tensor_model_parallel_rank() == 0 \
and args.pipeline_model_parallel_size > 1:
if mpu.is_pipeline_first_stage():
src = mpu.get_pipeline_model_parallel_first_rank()
group = mpu.get_pipeline_model_parallel_group()
if parallel_state.is_pipeline_first_stage():
src = parallel_state.get_pipeline_model_parallel_first_rank()
group = parallel_state.get_pipeline_model_parallel_group()
context_tokens_tensor = get_accelerator().LongTensor(context_tokens)
torch.distributed.broadcast(context_tokens_tensor, src, group)
else:
src = mpu.get_pipeline_model_parallel_first_rank()
group = mpu.get_pipeline_model_parallel_group()
src = parallel_state.get_pipeline_model_parallel_first_rank()
group = parallel_state.get_pipeline_model_parallel_group()
context_tokens_tensor = torch.empty(context_length,
dtype=torch.int64,
device=get_accelerator().current_device_name())
@ -169,8 +169,8 @@ def generate_samples_input_from_file(model):
for _, decode_tokens in enumerate(token_stream):
pass
if mpu.get_tensor_model_parallel_rank() == 0:
if mpu.is_pipeline_first_stage():
if parallel_state.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_first_stage():
os.system('clear')
print("\nContext:", raw_text, flush=True)
@ -234,8 +234,8 @@ def generate_samples_interactive(model, print_frequency=24):
terminate_runs = 0
raw_text_len = 0
if mpu.is_pipeline_first_stage() \
and mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_first_stage() \
and parallel_state.get_tensor_model_parallel_rank() == 0:
os.system('clear')
raw_text = input("\nContext prompt (stop to exit) >>> ")
while not raw_text:
@ -261,7 +261,7 @@ def generate_samples_interactive(model, print_frequency=24):
input_info = [terminate_runs, raw_text_len, context_length]
input_info_tensor = get_accelerator().LongTensor(input_info)
torch.distributed.all_reduce(input_info_tensor,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
terminate_runs = input_info_tensor[0].item()
raw_text_len = input_info_tensor[1].item()
context_length = input_info_tensor[2].item()
@ -271,16 +271,16 @@ def generate_samples_interactive(model, print_frequency=24):
# For pipeline parallel we send context tokens to other stages
# so they get the lengths correct
if mpu.get_tensor_model_parallel_rank() == 0 \
if parallel_state.get_tensor_model_parallel_rank() == 0 \
and args.pipeline_model_parallel_size > 1:
if mpu.is_pipeline_first_stage():
src = mpu.get_pipeline_model_parallel_first_rank()
group = mpu.get_pipeline_model_parallel_group()
if parallel_state.is_pipeline_first_stage():
src = parallel_state.get_pipeline_model_parallel_first_rank()
group = parallel_state.get_pipeline_model_parallel_group()
context_tokens_tensor = get_accelerator().LongTensor(context_tokens)
torch.distributed.broadcast(context_tokens_tensor, src, group)
else:
src = mpu.get_pipeline_model_parallel_first_rank()
group = mpu.get_pipeline_model_parallel_group()
src = parallel_state.get_pipeline_model_parallel_first_rank()
group = parallel_state.get_pipeline_model_parallel_group()
context_tokens_tensor = torch.empty(context_length,
dtype=torch.int64,
device=torch.device(get_accelerator().device_name()))
@ -291,8 +291,8 @@ def generate_samples_interactive(model, print_frequency=24):
for counter, decode_tokens in enumerate(token_stream):
if counter % print_frequency != 0 \
or mpu.get_tensor_model_parallel_rank() != 0 \
or not mpu.is_pipeline_first_stage():
or parallel_state.get_tensor_model_parallel_rank() != 0 \
or not parallel_state.is_pipeline_first_stage():
continue
os.system('clear')
@ -304,8 +304,8 @@ def generate_samples_interactive(model, print_frequency=24):
decode_tokens)[raw_text_len:]
print("\nAscendSpeed:", trim_decode_tokens, flush=True)
if mpu.is_pipeline_first_stage() \
and mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_first_stage() \
and parallel_state.get_tensor_model_parallel_rank() == 0:
os.system('clear')
print("\nContext:", raw_text, flush=True)
@ -341,8 +341,8 @@ def generate_samples_unconditional(model, latencies=[], model_latencies=[], sing
get_accelerator().synchronize()
latencies.append(time.time() - start_time)
start_time = time.time()
if mpu.is_pipeline_last_stage() and \
mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_last_stage() and \
parallel_state.get_tensor_model_parallel_rank() == 0:
#if ctr % args.log_interval == 0:
# print('Avg s/batch:',
# (time.time() - start_time) / min(args.log_interval, ctr + 1))
@ -376,8 +376,8 @@ def generate_and_write_samples_unconditional(model, latencies=[], single_token_l
assert args.genfile is not None
with open(args.genfile, 'w') as f:
for datum in generate_samples_unconditional(model, latencies=latencies, model_latencies=model_latencies, single_token_latency=single_token_latency):
if mpu.is_pipeline_last_stage() and \
mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_last_stage() and \
parallel_state.get_tensor_model_parallel_rank() == 0:
f.write(json.dumps(datum) + '\n')
@ -404,11 +404,11 @@ def get_token_stream(model, context_tokens, model_latencies=[], single_token_lat
context_length_tensor = get_accelerator().LongTensor(context_lengths)
torch.distributed.broadcast(context_length_tensor,
mpu.get_tensor_model_parallel_src_rank(),
group=mpu.get_tensor_model_parallel_group())
parallel_state.get_tensor_model_parallel_src_rank(),
group=parallel_state.get_tensor_model_parallel_group())
torch.distributed.broadcast(context_tokens_tensor,
mpu.get_tensor_model_parallel_src_rank(),
group=mpu.get_tensor_model_parallel_group())
parallel_state.get_tensor_model_parallel_src_rank(),
group=parallel_state.get_tensor_model_parallel_group())
context_length = context_length_tensor.min().item()
tokens, attention_mask, position_ids = get_batch(context_tokens_tensor)
@ -522,7 +522,7 @@ def sample_sequence_batch(model, context_tokens, context_lengths,
attention_mask,
tokentype_ids=type_ids,
forward_method_parallel_output=False)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
assert output is not None
logits = output[:, context_length - 1, :]
else:
@ -547,11 +547,11 @@ def sample_sequence_batch(model, context_tokens, context_lengths,
get_key_value=True,
tokentype_ids=types2use,
forward_method_parallel_output=False, model_latencies=model_latencies)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
assert output is not None
logits = output[:, -1].view(batch_size, -1).contiguous()
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
if args.greedy:
prev = torch.argmax(logits, dim=-1).view(-1)
else:
@ -567,8 +567,8 @@ def sample_sequence_batch(model, context_tokens, context_lengths,
new_tokens = switch(
tokens[:, context_length].view(-1), prev, started)
tokens[:, context_length] = new_tokens
src = mpu.get_pipeline_model_parallel_last_rank()
group = mpu.get_embedding_group()
src = parallel_state.get_pipeline_model_parallel_last_rank()
group = parallel_state.get_embedding_group()
torch.distributed.broadcast(new_tokens, src, group)
done_token = (prev == eos_id).byte() & started.byte()
@ -577,15 +577,15 @@ def sample_sequence_batch(model, context_tokens, context_lengths,
is_done = is_done | done_token
done = torch.all(is_done)
src = mpu.get_pipeline_model_parallel_last_rank()
group = mpu.get_pipeline_model_parallel_group()
src = parallel_state.get_pipeline_model_parallel_last_rank()
group = parallel_state.get_pipeline_model_parallel_group()
torch.distributed.broadcast(done, src, group)
yield tokens, lengths
else:
if mpu.is_pipeline_first_stage():
src = mpu.get_pipeline_model_parallel_last_rank()
group = mpu.get_embedding_group()
if parallel_state.is_pipeline_first_stage():
src = parallel_state.get_pipeline_model_parallel_last_rank()
group = parallel_state.get_embedding_group()
new_tokens = torch.empty_like(tokens[:, context_length])
torch.distributed.broadcast(new_tokens, src, group)
tokens[:, context_length] = new_tokens
@ -594,8 +594,8 @@ def sample_sequence_batch(model, context_tokens, context_lengths,
yield None, None
done = get_accelerator().ByteTensor([0])
src = mpu.get_pipeline_model_parallel_last_rank()
group = mpu.get_pipeline_model_parallel_group()
src = parallel_state.get_pipeline_model_parallel_last_rank()
group = parallel_state.get_pipeline_model_parallel_group()
torch.distributed.broadcast(done, src, group)
context_length += 1

View File

@ -39,6 +39,7 @@ from ascendspeed import update_num_microbatches
from ascendspeed import mpu
from ascendspeed import print_rank_0
from ascendspeed import print_rank_last
from ascendspeed.core import parallel_state
from ascendspeed.checkpointing import load_checkpoint
from ascendspeed.checkpointing import save_checkpoint
from ascendspeed.model import Float16Module
@ -75,7 +76,7 @@ def _initialize_optimized_pipeline():
# An example config when pipeline-model-parallel-size is 4.
# This theoretically reduces near 20% pipeline bubble.
assert args.micro_batch_size == 4
assert args.global_batch_size // mpu.get_data_parallel_world_size() == 64
assert args.global_batch_size // parallel_state.get_data_parallel_world_size() == 64
assert args.pipeline_model_parallel_size == 4
args.manual_mbs = [1, 2, 3, 4, 5, 5, 5, 5, 5, 5, \
5, 5, 5, 4, 3, 2]
@ -83,7 +84,7 @@ def _initialize_optimized_pipeline():
# An example config when pipeline-model-parallel-size is 8
# # This theoretically reduces near 30% pipeline bubble.
assert args.micro_batch_size == 4
assert args.global_batch_size // mpu.get_data_parallel_world_size() == 96
assert args.global_batch_size // parallel_state.get_data_parallel_world_size() == 96
assert args.pipeline_model_parallel_size == 8
args.manual_mbs = [1, 2, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, \
5, 5, 5, 5, 5, 5, 4, 4, 3, 3, 3, 2]
@ -100,9 +101,9 @@ def _initialize_optimized_pipeline():
# sanity check
assert isinstance(args.manual_mbs, list), 'A proper manual-mbs has to be provided'
assert len(args.manual_mbs) == args.global_batch_size // mpu.get_data_parallel_world_size() \
assert len(args.manual_mbs) == args.global_batch_size // parallel_state.get_data_parallel_world_size() \
// args.micro_batch_size, 'Check number of micro batches.'
assert sum(args.manual_mbs) * mpu.get_data_parallel_world_size() == args.global_batch_size, \
assert sum(args.manual_mbs) * parallel_state.get_data_parallel_world_size() == args.global_batch_size, \
'Check either miro batch sizes or global batch sizes.'
@ -336,22 +337,22 @@ def get_model(model_provider_func):
args = get_args()
# Build model.
if mpu.get_pipeline_model_parallel_world_size() > 1 and \
if parallel_state.get_pipeline_model_parallel_world_size() > 1 and \
args.virtual_pipeline_model_parallel_size is not None:
model = []
for i in range(args.virtual_pipeline_model_parallel_size):
mpu.set_virtual_pipeline_model_parallel_rank(i)
parallel_state.set_virtual_pipeline_model_parallel_rank(i)
# Set pre_process and post_process only after virtual rank is set.
pre_process = mpu.is_pipeline_first_stage()
post_process = mpu.is_pipeline_last_stage()
pre_process = parallel_state.is_pipeline_first_stage()
post_process = parallel_state.is_pipeline_last_stage()
this_model = model_provider_func(
pre_process=pre_process,
post_process=post_process
)
model.append(this_model)
else:
pre_process = mpu.is_pipeline_first_stage()
post_process = mpu.is_pipeline_last_stage()
pre_process = parallel_state.is_pipeline_first_stage()
post_process = parallel_state.is_pipeline_last_stage()
model = model_provider_func(
pre_process=pre_process,
post_process=post_process
@ -369,11 +370,11 @@ def get_model(model_provider_func):
mpu.set_defaults_if_not_set_tensor_model_parallel_attributes(param)
# Print number of parameters.
if mpu.get_data_parallel_rank() == 0:
if parallel_state.get_data_parallel_rank() == 0:
print(' > number of parameters on (tensor, pipeline) '
'model parallel rank ({}, {}): {}'.format(
mpu.get_tensor_model_parallel_rank(),
mpu.get_pipeline_model_parallel_rank(),
parallel_state.get_tensor_model_parallel_rank(),
parallel_state.get_pipeline_model_parallel_rank(),
sum([sum([p.ds_numel if hasattr(p,'ds_id') else p.nelement() for p in model_module.parameters()])
for model_module in model])), flush=True)
@ -393,7 +394,7 @@ def get_model(model_provider_func):
if args.DDP_impl == 'torch':
i = get_accelerator().current_device()
model = [torchDDP(model_module, device_ids=[i], output_device=i,
process_group=mpu.get_data_parallel_group())
process_group=parallel_state.get_data_parallel_group())
for model_module in model]
return model
@ -537,12 +538,12 @@ def setup_model_and_optimizer(model_provider_func,
if args.deepspeed:
print_rank_0("DeepSpeed is enabled.")
pp = mpu.get_pipeline_model_parallel_world_size()
pp = parallel_state.get_pipeline_model_parallel_world_size()
if args.data_efficiency_curriculum_learning and build_train_valid_test_datasets_provider is not None:
train_ds = None
# Only need to build dataset on tp rank 0 since ascendspeed has the
# broadcast_data() function that broadcast data from tp rank 0.
if mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.get_tensor_model_parallel_rank() == 0:
# Number of train/valid/test samples.
if args.train_samples:
train_samples = args.train_samples
@ -585,9 +586,9 @@ def setup_model_and_optimizer(model_provider_func,
# hack to get batch_fn from pretrain_gpt.py
model.set_batch_fn(model.module._megatron_batch_fn)
assert model.grid.get_pipe_parallel_rank() == mpu.get_pipeline_model_parallel_rank()
assert model.grid.get_slice_parallel_rank() == mpu.get_tensor_model_parallel_rank()
assert model.grid.get_data_parallel_rank() == mpu.get_data_parallel_rank()
assert model.grid.get_pipe_parallel_rank() == parallel_state.get_pipeline_model_parallel_rank()
assert model.grid.get_slice_parallel_rank() == parallel_state.get_tensor_model_parallel_rank()
assert model.grid.get_data_parallel_rank() == parallel_state.get_data_parallel_rank()
model = [model]
# Compression has its own checkpoint loading path (e.g, loading both teacher and student models). So if compression is enabled, we skip the following checkpoint loading.
@ -609,7 +610,7 @@ def setup_model_and_optimizer(model_provider_func,
model[0].global_steps = student_global_steps
# We only support local DDP with multiple micro-batches.
if len(model) > 1 or mpu.get_pipeline_model_parallel_world_size() > 1:
if len(model) > 1 or parallel_state.get_pipeline_model_parallel_world_size() > 1:
assert args.DDP_impl == 'local'
# get model without FP16 and/or TorchDDP wrappers
@ -652,7 +653,7 @@ def train_step(forward_step_func, data_iterator,
timers('forward-backward', log_level=1).start(
barrier=args.barrier_with_L1_time)
if mpu.get_pipeline_model_parallel_world_size() > 1:
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
if args.virtual_pipeline_model_parallel_size is not None:
forward_backward_func = forward_backward_pipelining_with_interleaving
assert get_num_microbatches() % args.pipeline_model_parallel_size == 0, \
@ -690,12 +691,12 @@ def train_step(forward_step_func, data_iterator,
# (BERT and GPT-2).
timers('backward-embedding-all-reduce', log_level=1).start(barrier=args.barrier_with_L1_time)
if not args.deepspeed:
if (mpu.is_pipeline_first_stage(ignore_virtual=True) or
mpu.is_pipeline_last_stage(ignore_virtual=True)) and \
mpu.get_pipeline_model_parallel_world_size() > 1:
if mpu.is_pipeline_first_stage(ignore_virtual=True):
if (parallel_state.is_pipeline_first_stage(ignore_virtual=True) or
parallel_state.is_pipeline_last_stage(ignore_virtual=True)) and \
parallel_state.get_pipeline_model_parallel_world_size() > 1:
if parallel_state.is_pipeline_first_stage(ignore_virtual=True):
unwrapped_model = model[0]
elif mpu.is_pipeline_last_stage(ignore_virtual=True):
elif parallel_state.is_pipeline_last_stage(ignore_virtual=True):
unwrapped_model = model[-1]
unwrapped_model = unwrap_model(
unwrapped_model, (torchDDP, LocalDDP, Float16Module))
@ -706,7 +707,7 @@ def train_step(forward_step_func, data_iterator,
grad = word_embeddings_weight.main_grad
else:
grad = word_embeddings_weight.grad
torch.distributed.all_reduce(grad, group=mpu.get_embedding_group())
torch.distributed.all_reduce(grad, group=parallel_state.get_embedding_group())
timers('backward-embedding-all-reduce').stop()
# Update parameters.
@ -742,7 +743,7 @@ def train_step(forward_step_func, data_iterator,
else:
skipped_iter = 1
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
# Average loss across microbatches.
loss_reduced = {}
for key in losses_reduced[0]:
@ -920,24 +921,24 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
if args.zero_stage > 0:
# ZeRO partiions optimizer states
opt_stats = get_accelerator().FloatTensor(opt_stats)
torch.distributed.all_reduce(opt_stats, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(opt_stats, group=parallel_state.get_data_parallel_group())
opt_stats_2 = get_accelerator().FloatTensor(opt_stats_2)
torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
if args.tensor_model_parallel_size > 1:
opt_stats = get_accelerator().FloatTensor(opt_stats)
torch.distributed.all_reduce(opt_stats, group=mpu.get_tensor_model_parallel_group())
torch.distributed.all_reduce(opt_stats, group=parallel_state.get_tensor_model_parallel_group())
opt_stats_2 = get_accelerator().FloatTensor(opt_stats_2)
torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX,
group=mpu.get_tensor_model_parallel_group())
group=parallel_state.get_tensor_model_parallel_group())
if args.pipeline_model_parallel_size > 1:
opt_stats = get_accelerator().FloatTensor(opt_stats)
torch.distributed.all_reduce(opt_stats, group=mpu.get_pipeline_model_parallel_group())
torch.distributed.all_reduce(opt_stats, group=parallel_state.get_pipeline_model_parallel_group())
opt_stats_2 = get_accelerator().FloatTensor(opt_stats_2)
torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX,
group=mpu.get_pipeline_model_parallel_group())
group=parallel_state.get_pipeline_model_parallel_group())
# print('step {} rank {} after sync opt_stats {}, {}'.format(iteration, torch.distributed.get_rank(), opt_stats_2, opt_stats))
if writer and is_last_rank():
@ -1093,7 +1094,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
update_num_microbatches(args.consumed_train_samples)
if args.deepspeed:
# inform deepspeed of any batch size changes
global_batch_size = mpu.get_data_parallel_world_size() * \
global_batch_size = parallel_state.get_data_parallel_world_size() * \
args.micro_batch_size * \
get_num_microbatches()
model[0].set_train_batch_size(global_batch_size)
@ -1109,7 +1110,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
lr_scheduler)
iteration += 1
args.iteration = iteration
new_samples = mpu.get_data_parallel_world_size() * \
new_samples = parallel_state.get_data_parallel_world_size() * \
args.micro_batch_size * \
get_num_microbatches()
args.consumed_train_samples += new_samples
@ -1125,7 +1126,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
if hasattr(args, 'data_efficiency_curriculum_learning_numel'):
act_mbsz = args.data_efficiency_curriculum_learning_numel / args.curriculum_seqlen
act_token = act_mbsz * args.actual_seq_length
args.consumed_train_tokens += mpu.get_data_parallel_world_size() * \
args.consumed_train_tokens += parallel_state.get_data_parallel_world_size() * \
get_num_microbatches() * act_token
else:
args.consumed_train_tokens += new_samples * args.actual_seq_length
@ -1229,7 +1230,7 @@ def evaluate(forward_step_func, data_iterator, model, verbose=False):
print_rank_0('Evaluating iter {}/{}'.format(iteration,
args.eval_iters))
if mpu.get_pipeline_model_parallel_world_size() > 1:
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
if args.virtual_pipeline_model_parallel_size is not None:
forward_backward_func = forward_backward_pipelining_with_interleaving
elif args.optimized_pipeline:
@ -1249,7 +1250,7 @@ def evaluate(forward_step_func, data_iterator, model, verbose=False):
forward_step_func, data_iterator, model, optimizer=None,
timers=None, forward_only=True)
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if parallel_state.is_pipeline_last_stage(ignore_virtual=True):
# Reduce across processes.
for loss_dict in loss_dicts:
for key in loss_dict:
@ -1257,7 +1258,7 @@ def evaluate(forward_step_func, data_iterator, model, verbose=False):
total_loss_dict[key] = total_loss_dict.get(
key, get_accelerator().FloatTensor([0.0])) + loss_dict[key]
args.consumed_valid_samples += mpu.get_data_parallel_world_size() \
args.consumed_valid_samples += parallel_state.get_data_parallel_world_size() \
* args.micro_batch_size \
* get_num_microbatches()
# Move model back to the train mode.
@ -1345,7 +1346,7 @@ def build_train_valid_test_data_iterators(
args.eval_iters * args.global_batch_size
# Data loader only on rank 0 of each model parallel group.
if mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.get_tensor_model_parallel_rank() == 0:
# Number of train/valid/test samples.
if args.train_samples:
@ -1401,8 +1402,8 @@ def build_train_valid_test_data_iterators(
# Broadcast num tokens.
torch.distributed.broadcast(flags,
mpu.get_tensor_model_parallel_src_rank(),
group=mpu.get_tensor_model_parallel_group())
parallel_state.get_tensor_model_parallel_src_rank(),
group=parallel_state.get_tensor_model_parallel_group())
args.do_train = flags[0].item()
num_valid_ds = flags[1].item()
num_test_ds = flags[2].item()

View File

@ -28,7 +28,7 @@ if get_accelerator().device_name() == 'cuda':
from ascendspeed import get_args
from ascendspeed import print_rank_0
from ascendspeed import get_adlr_autoresume
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.model.module import param_is_not_shared
from ascendspeed.mpu.layers import param_is_not_tensor_parallel_duplicate
from ascendspeed import get_num_microbatches
@ -82,7 +82,7 @@ def calc_params_l2_norm(model):
# Sum across all model-parallel GPUs.
torch.distributed.all_reduce(norm_2,
op=torch.distributed.ReduceOp.SUM,
group=mpu.get_model_parallel_group())
group=parallel_state.get_model_parallel_group())
return norm_2.item() ** 0.5
@ -91,9 +91,9 @@ def average_losses_across_data_parallel_group(losses):
averaged_losses = torch.cat(
[loss.clone().detach().view(1) for loss in losses])
torch.distributed.all_reduce(averaged_losses,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
averaged_losses = averaged_losses / \
torch.distributed.get_world_size(group=mpu.get_data_parallel_group())
torch.distributed.get_world_size(group=parallel_state.get_data_parallel_group())
return averaged_losses
@ -110,7 +110,7 @@ def report_memory(name):
get_accelerator().memory_reserved() / mega_bytes)
string += ' | max reserved: {}'.format(
get_accelerator().max_memory_reserved() / mega_bytes)
if mpu.get_data_parallel_rank() == 0:
if parallel_state.get_data_parallel_rank() == 0:
print("[Rank {}] {}".format(torch.distributed.get_rank(), string),
flush=True)
@ -252,7 +252,7 @@ def get_ltor_masks_and_position_ids(data,
def get_parameters_in_billions(model):
gpus_per_model = torch.distributed.get_world_size(group=mpu.get_model_parallel_group())
gpus_per_model = torch.distributed.get_world_size(group=parallel_state.get_model_parallel_group())
approx_parameters_in_billions = sum([sum([p.ds_numel if hasattr(p,'ds_id') else p.nelement() for p in model_module.parameters()])
for model_module in model])
@ -260,7 +260,7 @@ def get_parameters_in_billions(model):
return approx_parameters_in_billions*gpus_per_model/(1e9)
def throughput_calculator(model, args, iteration_time, total_iterations):
gpus_per_model = torch.distributed.get_world_size(group = mpu.get_model_parallel_group())
gpus_per_model = torch.distributed.get_world_size(group = parallel_state.get_model_parallel_group())
batch_size = args.micro_batch_size * get_num_microbatches() * args.data_parallel_size
samples_per_model = batch_size * args.seq_length
model_replica_count = torch.distributed.get_world_size() / gpus_per_model

View File

@ -15,7 +15,6 @@ WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES))
DATA_PATH=./dataset/llama_text_document
CHECKPOINT_PATH=./ckpt
rm -rf ./ckpt/*
DISTRIBUTED_ARGS="--nproc_per_node $NPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
# Main script

View File

@ -31,6 +31,7 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets, build_dataset_group
from ascendspeed.model import GPTModel, GPTModelPipe, ModelType
from ascendspeed.enums import AttnMaskType
@ -46,11 +47,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed:
args.pretrain_causal_attention = True
model = GPTModelPipe(

View File

@ -32,6 +32,7 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state, tensor_parallel
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets
from ascendspeed.model import GPTModel, GPTModelPipe
from ascendspeed.training import pretrain
@ -49,11 +50,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed and not args.no_pipeline_parallel:
model = GPTModelPipe(
num_tokentypes=0,
@ -260,7 +261,7 @@ def forward_step(data_iterator, model):
if args.curriculum_learning_legacy and args.curriculum_seqlen < args.seq_length:
assert args.curriculum_seqlen is not None
labels = labels[:, :args.curriculum_seqlen].contiguous()
output_tensor = mpu.vocab_parallel_cross_entropy(stu_output.contiguous().float(), labels)
output_tensor = tensor_parallel.vocab_parallel_cross_entropy(stu_output.contiguous().float(), labels)
else:
output_tensor, *other_losses = model(tokens, position_ids, attention_mask,
labels=labels)

View File

@ -32,6 +32,7 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets
from ascendspeed.model import LlamaModel, LlamaModelPipe
from ascendspeed.training import pretrain
@ -46,11 +47,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model ...", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed and not args.no_pipeline_parallel:
model = LlamaModelPipe(parallel_output=True)
# This is a hack to give us a reference to get_batch_pipe from within training.py
@ -215,6 +216,7 @@ def train_valid_test_datasets_provider(train_val_test_num_samples):
return train_ds, valid_ds, test_ds
if __name__ == "__main__":
torch.npu.set_compile_mode(jit_compile=True)
pretrain(train_valid_test_datasets_provider,
model_provider,
forward_step,

View File

@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pretrain Llama."""
"""Pretrain Llama2."""
import math
from functools import partial
@ -32,8 +32,9 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets
from ascendspeed.model import LlamaModel, LlamaModelPipe
from ascendspeed.model import Llama2Model, Llama2ModelPipe
from ascendspeed.training import pretrain
from ascendspeed.utils import get_ltor_masks_and_position_ids
from ascendspeed.utils import average_losses_across_data_parallel_group
@ -46,13 +47,13 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model ...", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed and not args.no_pipeline_parallel:
model = LlamaModelPipe(parallel_output=True)
model = Llama2ModelPipe(parallel_output=True)
# This is a hack to give us a reference to get_batch_pipe from within training.py
# We need to call model.set_batch_fn after deepspeed.initialize
model._megatron_batch_fn = get_batch_pipe
@ -75,7 +76,7 @@ def model_provider(pre_process=True, post_process=True):
# Attention mask must be bool.
args.attn_mask = attention_mask.to(torch.bool)
else:
model = LlamaModel(
model = Llama2Model(
parallel_output=True,
add_pooler=False,
pre_process=pre_process,
@ -210,11 +211,12 @@ def train_valid_test_datasets_provider(train_val_test_num_samples):
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating llama datasets ...")
print_rank_0("> finished creating llama2 datasets ...")
return train_ds, valid_ds, test_ds
if __name__ == "__main__":
torch.npu.set_compile_mode(jit_compile=True)
pretrain(train_valid_test_datasets_provider,
model_provider,
forward_step,

View File

@ -23,7 +23,7 @@ import torch
from ascendSpeed import get_args
from ascendSpeed import print_rank_0
from ascendSpeed import get_tokenizer
from ascendSpeed import mpu
from ascendspeed.core import parallel_state
from ascendSpeed.training import setup_model_and_optimizer, get_model
from ascendSpeed.mpu.mappings import gather_from_tensor_model_parallel_region
@ -57,14 +57,14 @@ class EvalHarnessAdaptor(GPT2LM):
self.is_main = args.rank == 0
self.is_local_main = args.local_rank == 0
self._device = get_accelerator().current_device_name()
self.is_model_parallel = mpu.get_tensor_model_parallel_world_size() > 1
self.is_pipe_parallel = mpu.get_pipeline_model_parallel_world_size() > 1
self.is_data_parallel = mpu.get_data_parallel_world_size() > 1
self.is_model_parallel = parallel_state.get_tensor_model_parallel_world_size() > 1
self.is_pipe_parallel = parallel_state.get_pipeline_model_parallel_world_size() > 1
self.is_data_parallel = parallel_state.get_data_parallel_world_size() > 1
self.adaptive_seq_len = args.adaptive_seq_len
if self.is_data_parallel and args.moe_expert_parallel_size == 1: # For MoE model, allow a "fake data parallel" in order to partition model into multiple gpus
raise NotImplementedError("Data parallelism is currently not supported for evaluation")
self.is_last_stage = True if not self.is_pipe_parallel else mpu.is_pipeline_last_stage() # only the last stage of the pipeline model will receive the logits
self.is_last_stage = True if not self.is_pipe_parallel else parallel_state.is_pipeline_last_stage() # only the last stage of the pipeline model will receive the logits
@property
def max_length(self):
@ -179,7 +179,7 @@ class EvalHarnessAdaptor(GPT2LM):
self.cache_hook.add_partial("loglikelihood", cache_key, answer)
res.append(answer)
if not mpu.is_pipeline_last_stage():
if not parallel_state.is_pipeline_last_stage():
# @HACK: To make the eval harness happy on threads that don't have access to the results.
# We just randomly generate some data.
res = [(np.random.rand(), np.random.rand()>0.5) for _ in requests]
@ -271,7 +271,7 @@ class EvalHarnessAdaptor(GPT2LM):
output = self.model(*self.create_model_inputs(inps)[0])
send_forward(output)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
return gather_from_tensor_model_parallel_region(output)[..., :self.tokenizer.vocab_size]
else:
return None
@ -378,7 +378,7 @@ def load_ds_checkpoint_and_setup_megatron(extra_args_provider):
else:
model = get_model(model_provider)[0]
# Initialize ascendspeed model using the parsed state dict.
sd = _create_rank_checkpoint(ds_checkpoint, None, mpu.get_tensor_model_parallel_rank(), mpu.get_pipeline_model_parallel_rank(), True)
sd = _create_rank_checkpoint(ds_checkpoint, None, parallel_state.get_tensor_model_parallel_rank(), parallel_state.get_pipeline_model_parallel_rank(), True)
model.load_state_dict(sd['model'], strict=True)
@ -423,7 +423,7 @@ def main():
adaptor = EvalHarnessAdaptor(model, tokenizer)
results = evaluator.evaluate(adaptor, task_dict, False, args.num_fewshot, None)
if mpu.is_pipeline_last_stage() and mpu.get_tensor_model_parallel_rank() == 0:
if parallel_state.is_pipeline_last_stage() and parallel_state.get_tensor_model_parallel_rank() == 0:
print(json.dumps(results, indent=2))
with open(args.results_path, 'w') as outfile:
json.dump(results, outfile, indent = 4)

View File

@ -23,7 +23,7 @@ import torch
from ascendspeed import get_args
from ascendspeed import print_rank_last, is_last_rank
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.schedules import get_forward_backward_func
from tasks.finetune_utils import build_data_loader
from tasks.finetune_utils import process_batch
@ -41,7 +41,7 @@ def accuracy_func_provider(single_dataset_provider):
dataset = single_dataset_provider(datapath)
dataloader = build_data_loader(
dataset, args.orig_micro_batch_size, num_workers=args.num_workers,
drop_last=(mpu.get_data_parallel_world_size() > 1))
drop_last=(parallel_state.get_data_parallel_world_size() > 1))
dataloaders.append((dataset.dataset_name, dataloader))
def metrics_func(model, epoch, output_predictions=False):
@ -49,7 +49,7 @@ def accuracy_func_provider(single_dataset_provider):
correct = 0
total = 0
if output_predictions:
assert mpu.get_data_parallel_world_size() == 1
assert parallel_state.get_data_parallel_world_size() == 1
named_predictions = []
names = 'predictions'
for name, dataloader in dataloaders:
@ -155,7 +155,7 @@ def calculate_correct_answers(name, model, dataloader,
predicted = []
if output_predictions:
# This option is only possible when data parallel size is 1.
assert mpu.get_data_parallel_world_size() == 1
assert parallel_state.get_data_parallel_world_size() == 1
softmaxes = []
labels = []
ids = []
@ -190,26 +190,26 @@ def calculate_correct_answers(name, model, dataloader,
args.global_batch_size = saved_global_batch_size
# Reduce.
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
if args.finetune and args.task in ['CoLA', 'STS-B']:
if args.task == 'CoLA':
labels = get_accelerator().LongTensor(labels)
predicted = get_accelerator().LongTensor(predicted)
labels_gather = [torch.zeros(len(labels), dtype=torch.long,
device=labels.device) for _ in range(mpu.get_data_parallel_world_size())]
device=labels.device) for _ in range(parallel_state.get_data_parallel_world_size())]
predicted_gather = [torch.zeros(len(predicted), dtype=torch.long,
device=predicted.device) for _ in range(mpu.get_data_parallel_world_size())]
device=predicted.device) for _ in range(parallel_state.get_data_parallel_world_size())]
else:
labels = get_accelerator().FloatTensor(labels)
predicted = get_accelerator().FloatTensor(predicted)
labels_gather = [torch.zeros(len(labels), dtype=torch.float,
device=labels.device) for _ in range(mpu.get_data_parallel_world_size())]
device=labels.device) for _ in range(parallel_state.get_data_parallel_world_size())]
predicted_gather = [torch.zeros(len(predicted), dtype=torch.float,
device=predicted.device) for _ in range(mpu.get_data_parallel_world_size())]
device=predicted.device) for _ in range(parallel_state.get_data_parallel_world_size())]
torch.distributed.all_gather(labels_gather, labels,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
torch.distributed.all_gather(predicted_gather, predicted,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
labels_gather = sum([x.data.cpu().numpy().tolist() for x in labels_gather], [])
predicted_gather = sum([x.data.cpu().numpy().tolist() for x in predicted_gather], [])
@ -239,7 +239,7 @@ def calculate_correct_answers(name, model, dataloader,
else:
unreduced = get_accelerator().LongTensor([correct, total])
torch.distributed.all_reduce(unreduced,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
# Print on screen.

View File

@ -22,7 +22,7 @@ import torch
from ascendspeed import get_args
from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.checkpointing import load_checkpoint
from ascendspeed.checkpointing import save_checkpoint
from ascendspeed.training import evaluate_and_print_results
@ -126,8 +126,8 @@ def build_data_loader(dataset, micro_batch_size, num_workers, drop_last):
"""Data loader. Note that batch-size is the local (per GPU) batch-size."""
# Sampler.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
world_size = parallel_state.get_data_parallel_world_size()
rank = parallel_state.get_data_parallel_rank()
sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=world_size, rank=rank)

View File

@ -19,7 +19,7 @@ import os
import torch
from ascendspeed import get_args
from ascendspeed import print_rank_0
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from tasks.vision.finetune_utils import build_data_loader
from tasks.vision.finetune_utils import process_batch
from torchvision import datasets, transforms
@ -49,7 +49,7 @@ def accuracy_func_provider():
dataset,
args.micro_batch_size,
num_workers=args.num_workers,
drop_last=(mpu.get_data_parallel_world_size() > 1),
drop_last=(parallel_state.get_data_parallel_world_size() > 1),
)
def metrics_func(model, epoch):
@ -87,7 +87,7 @@ def calculate_correct_answers(model, dataloader, epoch):
# Reduce.
unreduced = get_accelerator().LongTensor([correct, total])
torch.distributed.all_reduce(unreduced, group=mpu.get_data_parallel_group())
torch.distributed.all_reduce(unreduced, group=parallel_state.get_data_parallel_group())
# Print on screen.
correct_ans = unreduced[0].item()

View File

@ -20,7 +20,7 @@ import torch.nn.functional as F
from ascendspeed import get_args
from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.checkpointing import load_checkpoint
from ascendspeed.checkpointing import save_checkpoint
from ascendspeed.training import evaluate_and_print_results
@ -68,8 +68,8 @@ def build_data_loader(dataset, micro_batch_size, num_workers, drop_last):
"""Data loader. Note that batch-size is the local (per GPU) batch-size."""
# Sampler.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
world_size = parallel_state.get_data_parallel_world_size()
rank = parallel_state.get_data_parallel_rank()
sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=world_size, rank=rank
)

View File

@ -22,7 +22,7 @@ import torch
from ascendspeed import get_args
from ascendspeed import print_rank_0, is_last_rank
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state, tensor_parallel
from ascendspeed.checkpointing import load_checkpoint
from ascendspeed.model import GPTModel
from ascendspeed.training import get_model
@ -103,10 +103,10 @@ def forward_step(batch, model, eval_metric):
send_forward(output)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
# For loss, return the unreduced loss.
if eval_metric == 'loss':
losses = mpu.vocab_parallel_cross_entropy(
losses = tensor_parallel.vocab_parallel_cross_entropy(
output.contiguous().float(), labels.contiguous())
loss = torch.sum(
losses.view(-1) * loss_mask.contiguous().view(-1).float())
@ -142,9 +142,9 @@ def evaluate(data_loader, model, eval_metric):
output = forward_step(batch, model, eval_metric)
# Reduce across processes.
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
torch.distributed.all_reduce(output,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
total_output += output

View File

@ -22,7 +22,7 @@ import torch
from ascendspeed import get_args
from ascendspeed import print_rank_0, is_last_rank, print_rank_last
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state, tensor_parallel
from ascendspeed.checkpointing import load_checkpoint
from ascendspeed.model import GPTModel, LlamaModel
from ascendspeed.training import get_model
@ -61,11 +61,11 @@ def get_llama_model_provider(eval_metric):
'is not supported.'.format(eval_metric))
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
model = LlamaModel(
parallel_output=parallel_output,
add_pooler=False,
@ -120,10 +120,10 @@ def forward_step(batch, model, eval_metric):
send_forward(output)
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
# For loss, return the unreduced loss.
if eval_metric == 'loss':
losses = mpu.vocab_parallel_cross_entropy(
losses = tensor_parallel.vocab_parallel_cross_entropy(
output.contiguous().float(), labels.contiguous())
loss = torch.sum(
losses.view(-1) * loss_mask.contiguous().view(-1).float())
@ -198,7 +198,7 @@ def custom_forwrad(batch, model):
)
send_forward(outputs)
outputs = outputs.reshape(-1, mutil_choice_number, outputs.shape[-2], outputs.shape[-1])
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
for i, logits in enumerate(outputs):
preds = []
for choice in range(mutil_choice_number):
@ -233,9 +233,9 @@ def evaluate(data_loader, model, eval_metric):
output = custom_forwrad(batch, model)
# Reduce across processes.
if mpu.is_pipeline_last_stage():
if parallel_state.is_pipeline_last_stage():
torch.distributed.all_reduce(output,
group=mpu.get_data_parallel_group())
group=parallel_state.get_data_parallel_group())
total_output += output

View File

@ -25,7 +25,7 @@ from wrapt_timeout_decorator import timeout
from ascendspeed import get_args
from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.model import GPTModel, GPTModelPipe
from ascendspeed.training import pretrain
from ascendspeed.utils import average_losses_across_data_parallel_group
@ -43,11 +43,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed:
args.pretrain_causal_attention = True
model = GPTModelPipe(

View File

@ -30,6 +30,7 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state, tensor_parallel
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets
from ascendspeed.model import GPTModel, GPTModelPipe
from ascendspeed.training import pretrain
@ -51,11 +52,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed and not args.no_pipeline_parallel:
model = GPTModelPipe(
num_tokentypes=0,
@ -267,7 +268,7 @@ def forward_step(data_iterator, model):
if args.curriculum_learning_legacy and args.curriculum_seqlen < args.seq_length:
assert args.curriculum_seqlen is not None
labels = labels[:, :args.curriculum_seqlen].contiguous()
output_tensor = mpu.vocab_parallel_cross_entropy(stu_output.contiguous().float(), labels)
output_tensor = tensor_parallel.vocab_parallel_cross_entropy(stu_output.contiguous().float(), labels)
else:
output_tensor, *other_losses = model(tokens, position_ids, attention_mask,
labels=labels)

View File

@ -31,6 +31,7 @@ from ascendspeed import print_rank_0
from ascendspeed import get_timers
from ascendspeed import get_tokenizer
from ascendspeed import mpu
from ascendspeed.core import parallel_state
from ascendspeed.data.gpt_dataset import build_train_valid_test_datasets
from ascendspeed.model import LlamaModel, LlamaModelPipe
from ascendspeed.training import pretrain
@ -49,11 +50,11 @@ def model_provider(pre_process=True, post_process=True):
see_memory_usage(f"Before Building Model", force=True)
args = get_args()
with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(),
with deepspeed.zero.Init(data_parallel_group=parallel_state.get_data_parallel_group(),
remote_device=None if args.remote_device == 'none' else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu):
mpu=parallel_state):
if args.deepspeed and not args.no_pipeline_parallel:
model = LlamaModelPipe(parallel_output=True)
# This is a hack to give us a reference to get_batch_pipe from within training.py