mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 12:59:23 +08:00
Merge pull request #165 from fishpenguin/branch-0.5.1-yk
#164 - Add CPU version for building index Former-commit-id: 2ed2de91ff4992c2c43c6a3f9abd48e35b86cd3c
This commit is contained in:
commit
a81450175f
@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- \#115 - Using new structure for tasktable
|
||||
- \#139 - New config option use_gpu_threshold
|
||||
- \#146 - Add only GPU and only CPU version for IVF_SQ8 and IVF_FLAT
|
||||
- \#164 - Add CPU version for building index
|
||||
|
||||
## Improvement
|
||||
- \#64 - Improvement dump function in scheduler
|
||||
|
@ -104,20 +104,25 @@ JobMgr::build_task(const JobPtr& job) {
|
||||
|
||||
void
|
||||
JobMgr::calculate_path(const TaskPtr& task) {
|
||||
if (task->type_ != TaskType::SearchTask) {
|
||||
return;
|
||||
}
|
||||
if (task->type_ == TaskType::SearchTask) {
|
||||
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
return;
|
||||
std::vector<std::string> path;
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
|
||||
auto src = res_mgr_->GetDiskResources()[0];
|
||||
auto dest = spec_label->resource();
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
|
||||
task->path() = Path(path, path.size() - 1);
|
||||
} else if (task->type_ == TaskType::BuildIndexTask) {
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
|
||||
auto src = res_mgr_->GetDiskResources()[0];
|
||||
auto dest = spec_label->resource();
|
||||
std::vector<std::string> path;
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
|
||||
task->path() = Path(path, path.size() - 1);
|
||||
}
|
||||
|
||||
std::vector<std::string> path;
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
|
||||
auto src = res_mgr_->GetDiskResources()[0];
|
||||
auto dest = spec_label->resource();
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
|
||||
task->path() = Path(path, path.size() - 1);
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
|
@ -55,8 +55,8 @@ load_simple_config() {
|
||||
// get resources
|
||||
auto gpu_ids = get_gpu_pool();
|
||||
|
||||
int32_t build_gpu_id;
|
||||
config.GetResourceConfigIndexBuildDevice(build_gpu_id);
|
||||
int32_t index_build_device_id;
|
||||
config.GetResourceConfigIndexBuildDevice(index_build_device_id);
|
||||
|
||||
// create and connect
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
@ -70,15 +70,15 @@ load_simple_config() {
|
||||
for (auto& gpu_id : gpu_ids) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
|
||||
if (build_gpu_id == gpu_id) {
|
||||
if (index_build_device_id == gpu_id) {
|
||||
find_build_gpu_id = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (not find_build_gpu_id) {
|
||||
if (not find_build_gpu_id && index_build_device_id != server::CPU_DEVICE_ID) {
|
||||
ResMgrInst::GetInstance()->Add(
|
||||
ResourceFactory::Create(std::to_string(build_gpu_id), "GPU", build_gpu_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(build_gpu_id), pcie);
|
||||
ResourceFactory::Create(std::to_string(index_build_device_id), "GPU", index_build_device_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(index_build_device_id), pcie);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,6 @@ class OptimizerInst {
|
||||
has_cpu = true;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<PassPtr> pass_list;
|
||||
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
|
||||
pass_list.push_back(std::make_shared<HybridPass>());
|
||||
|
@ -70,8 +70,15 @@ TaskCreator::Create(const DeleteJobPtr& job) {
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const BuildIndexJobPtr& job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
// TODO(yukun): remove "disk" hardcode here
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
|
||||
server::Config& config = server::Config::GetInstance();
|
||||
int32_t build_index_id;
|
||||
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_id);
|
||||
ResourcePtr res_ptr;
|
||||
if (build_index_id == server::CPU_DEVICE_ID) {
|
||||
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
|
||||
} else {
|
||||
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_index_id);
|
||||
}
|
||||
|
||||
for (auto& to_index_file : job->to_index_files()) {
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
|
@ -138,73 +138,41 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto& res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
// if (task->job_.lock()->type() == JobType::SEARCH) {
|
||||
// auto label = task->label();
|
||||
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
|
||||
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
|
||||
// std::vector<std::string> spec_path;
|
||||
// spec_path.push_back(spec_label->resource().lock()->name());
|
||||
// spec_path.push_back(resource->name());
|
||||
// task->path() = Path(spec_path, spec_path.size() - 1);
|
||||
// } else {
|
||||
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t min_cost_idx = 0;
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->TotalTasks() == 0) {
|
||||
// min_cost_idx = i;
|
||||
// break;
|
||||
// }
|
||||
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
|
||||
// compute_resources[i]->NumOfTaskToExec() +
|
||||
// transport_costs[i];
|
||||
// if (min_cost > cost) {
|
||||
// min_cost = cost;
|
||||
// min_cost_idx = i;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: set path in task
|
||||
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// }
|
||||
//
|
||||
// } else
|
||||
if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
// step2: Read device id in config
|
||||
// get build index gpu resource
|
||||
server::Config& config = server::Config::GetInstance();
|
||||
int32_t build_index_gpu;
|
||||
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
|
||||
|
||||
bool find_gpu_res = false;
|
||||
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->name() ==
|
||||
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
find_gpu_res = true;
|
||||
Path task_path(paths[i], paths[i].size() - 1);
|
||||
task->path() = task_path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (not find_gpu_res) {
|
||||
task->path() = Path(paths[0], paths[0].size() - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
// if (resource->type() == ResourceType::DISK) {
|
||||
// // step 1: calculate shortest path per resource, from disk to compute resource
|
||||
// auto compute_resources = res_mgr->GetComputeResources();
|
||||
// std::vector<std::vector<std::string>> paths;
|
||||
// std::vector<uint64_t> transport_costs;
|
||||
// for (auto& res : compute_resources) {
|
||||
// std::vector<std::string> path;
|
||||
// uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
|
||||
// transport_costs.push_back(transport_cost);
|
||||
// paths.emplace_back(path);
|
||||
// }
|
||||
// if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
// // step2: Read device id in config
|
||||
// // get build index gpu resource
|
||||
// server::Config& config = server::Config::GetInstance();
|
||||
// int32_t build_index_gpu;
|
||||
// Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
|
||||
//
|
||||
// bool find_gpu_res = false;
|
||||
// if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->name() ==
|
||||
// res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
|
||||
// find_gpu_res = true;
|
||||
// Path task_path(paths[i], paths[i].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if (not find_gpu_res) {
|
||||
// task->path() = Path(paths[0], paths[0].size() - 1);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
if (resource->name() == task->path().Last()) {
|
||||
resource->WakeupExecutor();
|
||||
|
@ -46,7 +46,7 @@ OnlyGPUPass::Run(const TaskPtr& task) {
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
task->label() = label;
|
||||
|
||||
specified_gpu_id_ = specified_gpu_id_++ % gpu_id.size();
|
||||
specified_gpu_id_ = (specified_gpu_id_ + 1) % gpu_id.size();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -590,15 +590,18 @@ Config::CheckCacheConfigGpuCacheCapacity(const std::string& value) {
|
||||
return Status(SERVER_INVALID_ARGUMENT, msg);
|
||||
} else {
|
||||
uint64_t gpu_cache_capacity = std::stoi(value) * GB;
|
||||
int gpu_index;
|
||||
Status s = GetResourceConfigIndexBuildDevice(gpu_index);
|
||||
int device_id;
|
||||
Status s = GetResourceConfigIndexBuildDevice(device_id);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (device_id == server::CPU_DEVICE_ID)
|
||||
return Status::OK();
|
||||
|
||||
size_t gpu_memory;
|
||||
if (!ValidationUtil::GetGpuMemory(gpu_index, gpu_memory).ok()) {
|
||||
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(gpu_index);
|
||||
if (!ValidationUtil::GetGpuMemory(device_id, gpu_memory).ok()) {
|
||||
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(device_id);
|
||||
return Status(SERVER_UNEXPECTED_ERROR, msg);
|
||||
} else if (gpu_cache_capacity >= gpu_memory) {
|
||||
std::string msg = "Invalid gpu cache capacity: " + value +
|
||||
@ -1013,7 +1016,12 @@ Config::GetResourceConfigIndexBuildDevice(int32_t& value) {
|
||||
return s;
|
||||
}
|
||||
|
||||
value = std::stoi(str.substr(3));
|
||||
if (str == "cpu") {
|
||||
value = CPU_DEVICE_ID;
|
||||
} else {
|
||||
value = std::stoi(str.substr(3));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -95,6 +95,8 @@ static const char* CONFIG_RESOURCE_SEARCH_RESOURCES = "search_resources";
|
||||
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE = "index_build_device";
|
||||
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "gpu0";
|
||||
|
||||
const int32_t CPU_DEVICE_ID = -1;
|
||||
|
||||
class Config {
|
||||
public:
|
||||
static Config&
|
||||
|
Loading…
Reference in New Issue
Block a user