diff --git a/tests/core/callbacks/test_checkpoint_callback_torch.py b/tests/core/callbacks/test_checkpoint_callback_torch.py index 5f7d553f..eff7b420 100644 --- a/tests/core/callbacks/test_checkpoint_callback_torch.py +++ b/tests/core/callbacks/test_checkpoint_callback_torch.py @@ -75,131 +75,129 @@ def model_and_optimizers(request): @pytest.mark.torch @pytest.mark.parametrize("driver,device", [("torch", [0, 1])]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("version", [0, 1]) -@pytest.mark.parametrize("only_state_dict", [True, False]) @magic_argv_env_context(timeout=100) def test_model_checkpoint_callback_1( model_and_optimizers: TrainerParameters, driver, - device, - version, - only_state_dict + device ): - try: - path = Path.cwd().joinpath(f"test_model_checkpoint") - path.mkdir(exist_ok=True, parents=True) + for version in [0, 1]: + for only_state_dict in [True, False]: + try: + path = Path.cwd().joinpath(f"test_model_checkpoint") + path.mkdir(exist_ok=True, parents=True) - if version == 0: - callbacks = [ - CheckpointCallback(folder=path, every_n_epochs=1, every_n_batches=123, last=False, on_exceptions=None, topk=0, - monitor=None, only_state_dict=only_state_dict, save_object='model') - ] - elif version == 1: - callbacks = [ - CheckpointCallback(folder=path, every_n_epochs=3, every_n_batches=None, last=True, on_exceptions=None, topk=2, - monitor="acc", only_state_dict=only_state_dict, save_object='model') - ] + if version == 0: + callbacks = [ + CheckpointCallback(folder=path, every_n_epochs=1, every_n_batches=123, last=False, on_exceptions=None, topk=0, + monitor=None, only_state_dict=only_state_dict, save_object='model') + ] + elif version == 1: + callbacks = [ + CheckpointCallback(folder=path, every_n_epochs=3, every_n_batches=None, last=True, on_exceptions=None, topk=2, + monitor="acc", only_state_dict=only_state_dict, save_object='model') + ] - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=10, - callbacks=callbacks, - output_from_new_proc="all" - ) + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=10, + callbacks=callbacks, + output_from_new_proc="all" + ) - trainer.run() - print("Finish train") - all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - # 检查生成保存模型文件的数量是不是正确的; - if version == 0: + trainer.run() + print("Finish train") + all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + # 检查生成保存模型文件的数量是不是正确的; + if version == 0: - if not isinstance(device, list): - assert "model-epoch_10" in all_saved_model_paths - assert "model-epoch_4-batch_123" in all_saved_model_paths + if not isinstance(device, list): + assert "model-epoch_10" in all_saved_model_paths + assert "model-epoch_4-batch_123" in all_saved_model_paths - epoch_save_path = all_saved_model_paths["model-epoch_10"] - step_save_path = all_saved_model_paths["model-epoch_4-batch_123"] + epoch_save_path = all_saved_model_paths["model-epoch_10"] + step_save_path = all_saved_model_paths["model-epoch_4-batch_123"] - assert len(all_saved_model_paths) == 12 - # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; - else: - assert "model-epoch_6" in all_saved_model_paths - assert "model-epoch_9-batch_123" in all_saved_model_paths + assert len(all_saved_model_paths) == 12 + # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; + else: + assert "model-epoch_6" in all_saved_model_paths + assert "model-epoch_9-batch_123" in all_saved_model_paths - epoch_save_path = all_saved_model_paths["model-epoch_6"] - step_save_path = all_saved_model_paths["model-epoch_9-batch_123"] + epoch_save_path = all_saved_model_paths["model-epoch_6"] + step_save_path = all_saved_model_paths["model-epoch_9-batch_123"] - assert len(all_saved_model_paths) == 11 - all_state_dicts = [epoch_save_path, step_save_path] + assert len(all_saved_model_paths) == 11 + all_state_dicts = [epoch_save_path, step_save_path] - elif version == 1: + elif version == 1: - pattern = re.compile("model-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*") + pattern = re.compile("model-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*") - if not isinstance(device, list): - assert "model-epoch_9" in all_saved_model_paths - assert "model-last" in all_saved_model_paths - aLL_topk_folders = [] - for each_folder_name in all_saved_model_paths: - each_folder_name = pattern.findall(each_folder_name) - if len(each_folder_name) != 0: - aLL_topk_folders.append(each_folder_name[0]) - assert len(aLL_topk_folders) == 2 + if not isinstance(device, list): + assert "model-epoch_9" in all_saved_model_paths + assert "model-last" in all_saved_model_paths + aLL_topk_folders = [] + for each_folder_name in all_saved_model_paths: + each_folder_name = pattern.findall(each_folder_name) + if len(each_folder_name) != 0: + aLL_topk_folders.append(each_folder_name[0]) + assert len(aLL_topk_folders) == 2 - epoch_save_path = all_saved_model_paths["model-epoch_9"] - last_save_path = all_saved_model_paths["model-last"] - topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] + epoch_save_path = all_saved_model_paths["model-epoch_9"] + last_save_path = all_saved_model_paths["model-last"] + topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] - assert len(all_saved_model_paths) == 6 - # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; - else: - assert "model-epoch_9" in all_saved_model_paths - assert "model-last" in all_saved_model_paths + assert len(all_saved_model_paths) == 6 + # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; + else: + assert "model-epoch_9" in all_saved_model_paths + assert "model-last" in all_saved_model_paths - aLL_topk_folders = [] - for each_folder_name in all_saved_model_paths: - each_folder_name = pattern.findall(each_folder_name) - if len(each_folder_name) != 0: - aLL_topk_folders.append(each_folder_name[0]) - assert len(aLL_topk_folders) == 2 + aLL_topk_folders = [] + for each_folder_name in all_saved_model_paths: + each_folder_name = pattern.findall(each_folder_name) + if len(each_folder_name) != 0: + aLL_topk_folders.append(each_folder_name[0]) + assert len(aLL_topk_folders) == 2 - epoch_save_path = all_saved_model_paths["model-epoch_9"] - last_save_path = all_saved_model_paths["model-last"] - topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] + epoch_save_path = all_saved_model_paths["model-epoch_9"] + last_save_path = all_saved_model_paths["model-last"] + topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] - assert len(all_saved_model_paths) == 6 + assert len(all_saved_model_paths) == 6 - all_state_dicts = [epoch_save_path, last_save_path, topk_save_path] + all_state_dicts = [epoch_save_path, last_save_path, topk_save_path] - for folder in all_state_dicts: - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, + for folder in all_state_dicts: + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, - n_epochs=2, - output_from_new_proc="all" - ) - trainer.load_model(folder, only_state_dict=only_state_dict) + n_epochs=2, + output_from_new_proc="all" + ) + trainer.load_model(folder, only_state_dict=only_state_dict) - trainer.run() - trainer.driver.barrier() - finally: - rank_zero_rm(path) + trainer.run() + trainer.driver.barrier() + finally: + rank_zero_rm(path) if dist.is_initialized(): dist.destroy_process_group() @@ -207,92 +205,91 @@ def test_model_checkpoint_callback_1( @pytest.mark.torch @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("only_state_dict", [True]) @magic_argv_env_context(timeout=100) def test_model_checkpoint_callback_2( model_and_optimizers: TrainerParameters, driver, - device, - only_state_dict + device ): - try: - path = Path.cwd().joinpath("test_model_checkpoint") - path.mkdir(exist_ok=True, parents=True) + for only_state_dict in [True, False]: + try: + path = Path.cwd().joinpath("test_model_checkpoint") + path.mkdir(exist_ok=True, parents=True) - from fastNLP.core.callbacks.callback_event import Event + from fastNLP.core.callbacks.callback_event import Event - @Trainer.on(Event.on_train_epoch_end()) - def raise_exception(trainer): - if trainer.driver.get_local_rank() == 0 and trainer.cur_epoch_idx == 4: - raise NotImplementedError + @Trainer.on(Event.on_train_epoch_end()) + def raise_exception(trainer): + if trainer.driver.get_local_rank() == 0 and trainer.cur_epoch_idx == 4: + raise NotImplementedError - callbacks = [ - CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=False, - on_exceptions=NotImplementedError, topk=None, monitor=None, only_state_dict=only_state_dict, - save_object='model'), - ] + callbacks = [ + CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=False, + on_exceptions=NotImplementedError, topk=None, monitor=None, only_state_dict=only_state_dict, + save_object='model'), + ] - with pytest.raises(NotImplementedError): - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, + with pytest.raises(NotImplementedError): + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, - n_epochs=10, - callbacks=callbacks, - output_from_new_proc="all" - ) + n_epochs=10, + callbacks=callbacks, + output_from_new_proc="all" + ) - trainer.run() + trainer.run() - if dist.is_initialized(): - dist.destroy_process_group() - if FASTNLP_DISTRIBUTED_CHECK in os.environ: - os.environ.pop(FASTNLP_DISTRIBUTED_CHECK) + if dist.is_initialized(): + dist.destroy_process_group() + if FASTNLP_DISTRIBUTED_CHECK in os.environ: + os.environ.pop(FASTNLP_DISTRIBUTED_CHECK) - # 检查生成保存模型文件的数量是不是正确的; - all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + # 检查生成保存模型文件的数量是不是正确的; + all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - if not isinstance(device, list): - assert "model-epoch_4-batch_100-exception_NotImplementedError" in all_saved_model_paths - exception_model_path = all_saved_model_paths["model-epoch_4-batch_100-exception_NotImplementedError"] - # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; - else: - assert "model-epoch_4-batch_52-exception_NotImplementedError" in all_saved_model_paths - exception_model_path = all_saved_model_paths["model-epoch_4-batch_52-exception_NotImplementedError"] + if not isinstance(device, list): + assert "model-epoch_4-batch_100-exception_NotImplementedError" in all_saved_model_paths + exception_model_path = all_saved_model_paths["model-epoch_4-batch_100-exception_NotImplementedError"] + # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; + else: + assert "model-epoch_4-batch_52-exception_NotImplementedError" in all_saved_model_paths + exception_model_path = all_saved_model_paths["model-epoch_4-batch_52-exception_NotImplementedError"] - assert len(all_saved_model_paths) == 1 - all_state_dicts = [exception_model_path] + assert len(all_saved_model_paths) == 1 + all_state_dicts = [exception_model_path] - for folder in all_state_dicts: - trainer = Trainer( - model=model_and_optimizers.model, - driver="torch", - device=4, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, + for folder in all_state_dicts: + trainer = Trainer( + model=model_and_optimizers.model, + driver="torch", + device=4, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, - n_epochs=2, - output_from_new_proc="all" - ) + n_epochs=2, + output_from_new_proc="all" + ) - trainer.load_model(folder, only_state_dict=only_state_dict) - trainer.run() - trainer.driver.barrier() + trainer.load_model(folder, only_state_dict=only_state_dict) + trainer.run() + trainer.driver.barrier() - finally: - rank_zero_rm(path) - # pass + finally: + rank_zero_rm(path) + # pass if dist.is_initialized(): dist.destroy_process_group() @@ -300,130 +297,128 @@ def test_model_checkpoint_callback_2( @pytest.mark.torch @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("version", [0, 1]) -@pytest.mark.parametrize("only_state_dict", [True, False]) @magic_argv_env_context(timeout=100) def test_trainer_checkpoint_callback_1( model_and_optimizers: TrainerParameters, driver, - device, - version, - only_state_dict + device ): - try: - path = Path.cwd().joinpath(f"test_model_checkpoint") - path.mkdir(exist_ok=True, parents=True) + for version in [0, 1]: + for only_state_dict in [True, False]: + try: + path = Path.cwd().joinpath(f"test_model_checkpoint") + path.mkdir(exist_ok=True, parents=True) - if version == 0: - callbacks = [ - CheckpointCallback(folder=path, every_n_epochs=7, every_n_batches=123, last=False, on_exceptions=None, topk=0, - monitor=None, only_state_dict=only_state_dict, save_object='trainer') - ] - elif version == 1: - callbacks = [ - CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=True, on_exceptions=None, - topk=2, monitor="acc", only_state_dict=only_state_dict, save_object='trainer') - ] + if version == 0: + callbacks = [ + CheckpointCallback(folder=path, every_n_epochs=7, every_n_batches=123, last=False, on_exceptions=None, topk=0, + monitor=None, only_state_dict=only_state_dict, save_object='trainer') + ] + elif version == 1: + callbacks = [ + CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=True, on_exceptions=None, + topk=2, monitor="acc", only_state_dict=only_state_dict, save_object='trainer') + ] - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, - n_epochs=10, - callbacks=callbacks, - output_from_new_proc="all" - ) + n_epochs=10, + callbacks=callbacks, + output_from_new_proc="all" + ) - trainer.run() + trainer.run() - all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - # 检查生成保存模型文件的数量是不是正确的; - if version == 0: + all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + # 检查生成保存模型文件的数量是不是正确的; + if version == 0: - if not isinstance(device, list): - assert "trainer-epoch_7" in all_saved_model_paths - assert "trainer-epoch_4-batch_123" in all_saved_model_paths + if not isinstance(device, list): + assert "trainer-epoch_7" in all_saved_model_paths + assert "trainer-epoch_4-batch_123" in all_saved_model_paths - epoch_save_path = all_saved_model_paths["trainer-epoch_7"] - step_save_path = all_saved_model_paths["trainer-epoch_4-batch_123"] + epoch_save_path = all_saved_model_paths["trainer-epoch_7"] + step_save_path = all_saved_model_paths["trainer-epoch_4-batch_123"] - assert len(all_saved_model_paths) == 3 - # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; - else: - assert "trainer-epoch_7" in all_saved_model_paths - assert "trainer-epoch_9-batch_123" in all_saved_model_paths + assert len(all_saved_model_paths) == 3 + # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; + else: + assert "trainer-epoch_7" in all_saved_model_paths + assert "trainer-epoch_9-batch_123" in all_saved_model_paths - epoch_save_path = all_saved_model_paths["trainer-epoch_7"] - step_save_path = all_saved_model_paths["trainer-epoch_9-batch_123"] + epoch_save_path = all_saved_model_paths["trainer-epoch_7"] + step_save_path = all_saved_model_paths["trainer-epoch_9-batch_123"] - assert len(all_saved_model_paths) == 2 - all_state_dicts = [epoch_save_path, step_save_path] + assert len(all_saved_model_paths) == 2 + all_state_dicts = [epoch_save_path, step_save_path] - elif version == 1: + elif version == 1: - pattern = re.compile("trainer-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*") + pattern = re.compile("trainer-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*") - # all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - if not isinstance(device, list): - assert "trainer-last" in all_saved_model_paths - aLL_topk_folders = [] - for each_folder_name in all_saved_model_paths: - each_folder_name = pattern.findall(each_folder_name) - if len(each_folder_name) != 0: - aLL_topk_folders.append(each_folder_name[0]) - assert len(aLL_topk_folders) == 2 + # all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + if not isinstance(device, list): + assert "trainer-last" in all_saved_model_paths + aLL_topk_folders = [] + for each_folder_name in all_saved_model_paths: + each_folder_name = pattern.findall(each_folder_name) + if len(each_folder_name) != 0: + aLL_topk_folders.append(each_folder_name[0]) + assert len(aLL_topk_folders) == 2 - last_save_path = all_saved_model_paths["trainer-last"] - topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] + last_save_path = all_saved_model_paths["trainer-last"] + topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] - assert len(all_saved_model_paths) == 3 - # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; - else: - assert "trainer-last" in all_saved_model_paths + assert len(all_saved_model_paths) == 3 + # ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完; + else: + assert "trainer-last" in all_saved_model_paths - aLL_topk_folders = [] - for each_folder_name in all_saved_model_paths: - each_folder_name = pattern.findall(each_folder_name) - if len(each_folder_name) != 0: - aLL_topk_folders.append(each_folder_name[0]) - assert len(aLL_topk_folders) == 2 + aLL_topk_folders = [] + for each_folder_name in all_saved_model_paths: + each_folder_name = pattern.findall(each_folder_name) + if len(each_folder_name) != 0: + aLL_topk_folders.append(each_folder_name[0]) + assert len(aLL_topk_folders) == 2 - last_save_path = all_saved_model_paths["trainer-last"] - topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] + last_save_path = all_saved_model_paths["trainer-last"] + topk_save_path = all_saved_model_paths[aLL_topk_folders[0]] - assert len(all_saved_model_paths) == 3 + assert len(all_saved_model_paths) == 3 - all_state_dicts = [last_save_path, topk_save_path] + all_state_dicts = [last_save_path, topk_save_path] - for folder in all_state_dicts: - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, + for folder in all_state_dicts: + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, - n_epochs=13, - output_from_new_proc="all" - ) - trainer.load_checkpoint(folder, only_state_dict=only_state_dict) + n_epochs=13, + output_from_new_proc="all" + ) + trainer.load_checkpoint(folder, only_state_dict=only_state_dict) - trainer.run() - trainer.driver.barrier() + trainer.run() + trainer.driver.barrier() - finally: - rank_zero_rm(path) + finally: + rank_zero_rm(path) if dist.is_initialized(): dist.destroy_process_group() diff --git a/tests/core/callbacks/test_load_best_model_callback_torch.py b/tests/core/callbacks/test_load_best_model_callback_torch.py index c607bb87..9ce5c99d 100644 --- a/tests/core/callbacks/test_load_best_model_callback_torch.py +++ b/tests/core/callbacks/test_load_best_model_callback_torch.py @@ -72,47 +72,45 @@ def model_and_optimizers(request): @pytest.mark.torch -@pytest.mark.parametrize("driver,device", [("torch", [4, 5]), ("torch", 1), ("torch", "cpu")]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("save_folder", ['save_models', None]) -@pytest.mark.parametrize("only_state_dict", [True, False]) +@pytest.mark.parametrize("driver,device", [("torch", [0, 1]), ("torch", 1), ("torch", "cpu")]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @magic_argv_env_context def test_load_best_model_callback( model_and_optimizers: TrainerParameters, driver, - device, - save_folder, - only_state_dict + device ): - callbacks = [LoadBestModelCallback(monitor='acc')] + for save_folder in ['save_models', None]: + for only_state_dict in [True, False]: + callbacks = [LoadBestModelCallback(monitor='acc')] - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']}, - metrics=model_and_optimizers.metrics, - n_epochs=3, - callbacks=callbacks, - output_from_new_proc="all" - ) + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']}, + metrics=model_and_optimizers.metrics, + n_epochs=3, + callbacks=callbacks, + output_from_new_proc="all" + ) - trainer.run(num_eval_sanity_batch=0) + trainer.run(num_eval_sanity_batch=0) - driver = TorchSingleDriver(model_and_optimizers.model, device=torch.device('cuda')) - evaluator = Evaluator(model_and_optimizers.model, driver=driver, device=device, - dataloaders={'dl1': model_and_optimizers.evaluate_dataloaders}, - metrics={'acc': Accuracy(aggregate_when_get_metric=False)}, - output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']}, - progress_bar='rich', use_dist_sampler=False) - results = evaluator.run() - assert np.allclose(callbacks[0].monitor_value, results['acc#acc#dl1']) - if save_folder: - import shutil - shutil.rmtree(save_folder, ignore_errors=True) + driver = TorchSingleDriver(model_and_optimizers.model, device=torch.device('cuda')) + evaluator = Evaluator(model_and_optimizers.model, driver=driver, device=device, + dataloaders={'dl1': model_and_optimizers.evaluate_dataloaders}, + metrics={'acc': Accuracy(aggregate_when_get_metric=False)}, + output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']}, + progress_bar='rich', use_dist_sampler=False) + results = evaluator.run() + assert np.allclose(callbacks[0].monitor_value, results['acc#acc#dl1']) + if save_folder: + import shutil + shutil.rmtree(save_folder, ignore_errors=True) if dist.is_initialized(): dist.destroy_process_group() diff --git a/tests/core/callbacks/test_more_evaluate_callback.py b/tests/core/callbacks/test_more_evaluate_callback.py index 4fd9d0d3..e49d9f88 100644 --- a/tests/core/callbacks/test_more_evaluate_callback.py +++ b/tests/core/callbacks/test_more_evaluate_callback.py @@ -93,84 +93,82 @@ def model_and_optimizers(request): @pytest.mark.torch @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("version", [0, 1]) -@pytest.mark.parametrize("only_state_dict", [True, False]) @magic_argv_env_context def test_model_more_evaluate_callback_1( model_and_optimizers: TrainerParameters, driver, device, - version, - only_state_dict ): - try: - path = Path.cwd().joinpath(f"test_model_checkpoint") - path.mkdir(exist_ok=True, parents=True) + for only_state_dict in [True, False]: + for version in [0, 1]: + try: + path = Path.cwd().joinpath(f"test_model_checkpoint") + path.mkdir(exist_ok=True, parents=True) - if version == 0: - callbacks = [ - MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, - metrics=model_and_optimizers.more_metrics, - evaluate_every=-1, - folder=path, topk=-1, - topk_monitor='acc', only_state_dict=only_state_dict, save_object='model') - ] - elif version == 1: - callbacks = [ - MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, - metrics=model_and_optimizers.more_metrics, - evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False, - folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict, - save_object='model') - ] - n_epochs = 5 - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=n_epochs, - callbacks=callbacks, - output_from_new_proc="all", - evaluate_fn='train_step' - ) + if version == 0: + callbacks = [ + MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, + metrics=model_and_optimizers.more_metrics, + evaluate_every=-1, + folder=path, topk=-1, + topk_monitor='acc', only_state_dict=only_state_dict, save_object='model') + ] + elif version == 1: + callbacks = [ + MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, + metrics=model_and_optimizers.more_metrics, + evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False, + folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict, + save_object='model') + ] + n_epochs = 5 + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=n_epochs, + callbacks=callbacks, + output_from_new_proc="all", + evaluate_fn='train_step' + ) - trainer.run() + trainer.run() - all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - # 检查生成保存模型文件的数量是不是正确的; - if version == 0: - assert len(all_saved_model_paths) == n_epochs - elif version == 1: - assert len(all_saved_model_paths) == 1 + all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + # 检查生成保存模型文件的数量是不是正确的; + if version == 0: + assert len(all_saved_model_paths) == n_epochs + elif version == 1: + assert len(all_saved_model_paths) == 1 - for folder in all_saved_model_paths: - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=2, - output_from_new_proc="all", - evaluate_fn='train_step' - ) - folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder) - trainer.load_model(folder, only_state_dict=only_state_dict) + for folder in all_saved_model_paths: + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=2, + output_from_new_proc="all", + evaluate_fn='train_step' + ) + folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder) + trainer.load_model(folder, only_state_dict=only_state_dict) - trainer.run() - trainer.driver.barrier() - finally: - rank_zero_rm(path) + trainer.run() + trainer.driver.barrier() + finally: + rank_zero_rm(path) if dist.is_initialized(): dist.destroy_process_group() @@ -178,85 +176,83 @@ def test_model_more_evaluate_callback_1( @pytest.mark.torch @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) -@pytest.mark.parametrize("version", [0, 1]) -@pytest.mark.parametrize("only_state_dict", [True, False]) @magic_argv_env_context def test_trainer_checkpoint_callback_1( model_and_optimizers: TrainerParameters, driver, - device, - version, - only_state_dict + device ): - try: - path = Path.cwd().joinpath(f"test_model_checkpoint") - path.mkdir(exist_ok=True, parents=True) + for version in [0, 1]: + for only_state_dict in [True, False]: + try: + path = Path.cwd().joinpath(f"test_model_checkpoint") + path.mkdir(exist_ok=True, parents=True) - if version == 0: - callbacks = [ - MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, - metrics=model_and_optimizers.more_metrics, - evaluate_every=-1, - folder=path, topk=-1, - topk_monitor='acc', only_state_dict=only_state_dict, save_object='trainer') - ] - elif version == 1: - callbacks = [ - MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, - metrics=model_and_optimizers.more_metrics, - evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False, - folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict, - save_object='trainer') - ] - n_epochs = 5 - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=n_epochs, - callbacks=callbacks, - output_from_new_proc="all", - evaluate_fn='train_step' - ) + if version == 0: + callbacks = [ + MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, + metrics=model_and_optimizers.more_metrics, + evaluate_every=-1, + folder=path, topk=-1, + topk_monitor='acc', only_state_dict=only_state_dict, save_object='trainer') + ] + elif version == 1: + callbacks = [ + MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders, + metrics=model_and_optimizers.more_metrics, + evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False, + folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict, + save_object='trainer') + ] + n_epochs = 5 + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=n_epochs, + callbacks=callbacks, + output_from_new_proc="all", + evaluate_fn='train_step' + ) - trainer.run() + trainer.run() - all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} - # 检查生成保存模型文件的数量是不是正确的; - if version == 0: - assert len(all_saved_model_paths) == n_epochs - elif version == 1: - assert len(all_saved_model_paths) == 1 + all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()} + # 检查生成保存模型文件的数量是不是正确的; + if version == 0: + assert len(all_saved_model_paths) == n_epochs + elif version == 1: + assert len(all_saved_model_paths) == 1 - for folder in all_saved_model_paths: - trainer = Trainer( - model=model_and_optimizers.model, - driver=driver, - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=7, - output_from_new_proc="all", - evaluate_fn='train_step' - ) - folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder) - trainer.load_checkpoint(folder, only_state_dict=only_state_dict) + for folder in all_saved_model_paths: + trainer = Trainer( + model=model_and_optimizers.model, + driver=driver, + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=7, + output_from_new_proc="all", + evaluate_fn='train_step' + ) + folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder) + trainer.load_checkpoint(folder, only_state_dict=only_state_dict) - trainer.run() - trainer.driver.barrier() + trainer.run() + trainer.driver.barrier() - finally: - rank_zero_rm(path) + finally: + rank_zero_rm(path) if dist.is_initialized(): dist.destroy_process_group() diff --git a/tests/core/callbacks/test_progress_callback_torch.py b/tests/core/callbacks/test_progress_callback_torch.py index 75d3dbda..d2f2f59b 100644 --- a/tests/core/callbacks/test_progress_callback_torch.py +++ b/tests/core/callbacks/test_progress_callback_torch.py @@ -82,37 +82,37 @@ def model_and_optimizers(request): @pytest.mark.torch @pytest.mark.parametrize('device', ['cpu', [0, 1]]) -@pytest.mark.parametrize('progress_bar', ['rich', 'auto', None, 'raw', 'tqdm']) @magic_argv_env_context -def test_run( model_and_optimizers: TrainerParameters, device, progress_bar): +def test_run( model_and_optimizers: TrainerParameters, device): if device != 'cpu' and not torch.cuda.is_available(): pytest.skip(f"No cuda for device:{device}") n_epochs = 5 - trainer = Trainer( - model=model_and_optimizers.model, - driver='torch', - device=device, - optimizers=model_and_optimizers.optimizers, - train_dataloader=model_and_optimizers.train_dataloader, - evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, - input_mapping=model_and_optimizers.input_mapping, - output_mapping=model_and_optimizers.output_mapping, - metrics=model_and_optimizers.metrics, - n_epochs=n_epochs, - callbacks=None, - progress_bar=progress_bar, - output_from_new_proc="all", - evaluate_fn='train_step', - larger_better=False - ) + for progress_bar in ['rich', 'auto', None, 'raw', 'tqdm']: + trainer = Trainer( + model=model_and_optimizers.model, + driver='torch', + device=device, + optimizers=model_and_optimizers.optimizers, + train_dataloader=model_and_optimizers.train_dataloader, + evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders, + input_mapping=model_and_optimizers.input_mapping, + output_mapping=model_and_optimizers.output_mapping, + metrics=model_and_optimizers.metrics, + n_epochs=n_epochs, + callbacks=None, + progress_bar=progress_bar, + output_from_new_proc="all", + evaluate_fn='train_step', + larger_better=False + ) - trainer.run() + trainer.run() - evaluator = Evaluator(model=model_and_optimizers.model, dataloaders=model_and_optimizers.train_dataloader, - driver=trainer.driver, metrics=model_and_optimizers.metrics, - progress_bar=progress_bar, evaluate_fn='train_step') - evaluator.run() + evaluator = Evaluator(model=model_and_optimizers.model, dataloaders=model_and_optimizers.train_dataloader, + driver=trainer.driver, metrics=model_and_optimizers.metrics, + progress_bar=progress_bar, evaluate_fn='train_step') + evaluator.run() if dist.is_initialized(): dist.destroy_process_group()