mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
MS-428 Add PushTaskByDataLocality in scheduler
Former-commit-id: c8c9b7a7ef5d81f7ad40f2116f7ce01dbe13b90a
This commit is contained in:
parent
399ee6e244
commit
148caad58d
@ -73,6 +73,8 @@ public:
|
||||
virtual EngineType IndexEngineType() const = 0;
|
||||
|
||||
virtual MetricType IndexMetricType() const = 0;
|
||||
|
||||
virtual std::string GetLocation() const = 0;
|
||||
};
|
||||
|
||||
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
|
||||
|
@ -73,6 +73,8 @@ public:
|
||||
|
||||
MetricType IndexMetricType() const override { return metric_type_; }
|
||||
|
||||
std::string GetLocation() const override { return location_; }
|
||||
|
||||
private:
|
||||
VecIndexPtr CreatetVecIndex(EngineType type);
|
||||
|
||||
|
@ -28,6 +28,27 @@ ResourceMgr::GetNumOfComputeResource() {
|
||||
return count;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
ResourceMgr::GetNumGpuResource() const {
|
||||
uint64_t num = 0;
|
||||
for (auto &res : resources_) {
|
||||
if (res->Type() == ResourceType::GPU) {
|
||||
num++;
|
||||
}
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
ResourcePtr
|
||||
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
|
||||
for (auto &resource : resources_) {
|
||||
if (resource->Type() == type && resource->DeviceId() == device_id) {
|
||||
return resource;
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
ResourceWPtr
|
||||
ResourceMgr::Add(ResourcePtr &&resource) {
|
||||
ResourceWPtr ret(resource);
|
||||
|
@ -35,6 +35,12 @@ public:
|
||||
return disk_resources_;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
GetNumGpuResource() const;
|
||||
|
||||
ResourcePtr
|
||||
GetResource(ResourceType type, uint64_t device_id);
|
||||
|
||||
/*
|
||||
* Return account of resource which enable executor;
|
||||
*/
|
||||
|
@ -4,6 +4,7 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <src/cache/GpuCacheMgr.h>
|
||||
#include "Scheduler.h"
|
||||
#include "Cost.h"
|
||||
#include "action/Action.h"
|
||||
@ -116,7 +117,18 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
|
||||
Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource);
|
||||
auto task = load_completed_event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
auto location = search_task->index_engine_->GetLocation();
|
||||
|
||||
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
|
||||
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
|
||||
if (index != nullptr) {
|
||||
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
|
||||
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -19,6 +19,9 @@ public:
|
||||
|
||||
static void
|
||||
PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self);
|
||||
|
||||
static void
|
||||
PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest);
|
||||
};
|
||||
|
||||
|
||||
|
@ -46,6 +46,10 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
||||
dest->task_table().Put(task);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<CpuResource>";
|
||||
return "<CpuResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<DiskResource>";
|
||||
return "<DiskResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
|
||||
inline std::string
|
||||
Dump() const override {
|
||||
return "<GpuResource>";
|
||||
return "<GpuResource, name=" + name_ + ">";
|
||||
}
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
|
||||
|
@ -93,6 +93,11 @@ public:
|
||||
return type_;
|
||||
}
|
||||
|
||||
inline uint64_t
|
||||
DeviceId() {
|
||||
return device_id_;
|
||||
}
|
||||
|
||||
// TODO: better name?
|
||||
inline bool
|
||||
HasLoader() {
|
||||
@ -172,9 +177,8 @@ private:
|
||||
|
||||
protected:
|
||||
uint64_t device_id_;
|
||||
|
||||
private:
|
||||
std::string name_;
|
||||
private:
|
||||
ResourceType type_;
|
||||
|
||||
TaskTable task_table_;
|
||||
|
@ -4,6 +4,7 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <src/cache/GpuCacheMgr.h>
|
||||
#include "TestTask.h"
|
||||
|
||||
|
||||
@ -11,7 +12,8 @@ namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
TestTask::TestTask() : Task(TaskType::TestTask) {}
|
||||
|
||||
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {}
|
||||
|
||||
void
|
||||
TestTask::Load(LoadType type, uint8_t device_id) {
|
||||
@ -27,7 +29,8 @@ TestTask::Execute() {
|
||||
|
||||
TaskPtr
|
||||
TestTask::Clone() {
|
||||
auto ret = std::make_shared<TestTask>();
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
auto ret = std::make_shared<TestTask>(dummy);
|
||||
ret->load_count_ = load_count_;
|
||||
ret->exec_count_ = exec_count_;
|
||||
return ret;
|
||||
|
@ -5,16 +5,16 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "Task.h"
|
||||
#include "SearchTask.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class TestTask : public Task {
|
||||
class TestTask : public XSearchTask {
|
||||
public:
|
||||
TestTask();
|
||||
TestTask(TableFileSchemaPtr& file);
|
||||
|
||||
public:
|
||||
void
|
||||
|
@ -10,8 +10,9 @@ class CostTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < 8; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
table_.Put(task);
|
||||
}
|
||||
table_.Get(0)->state = TaskTableItemState::INVALID;
|
||||
|
@ -14,7 +14,7 @@ TEST(normal_test, test1) {
|
||||
// auto res_mgr = std::make_shared<ResourceMgr>();
|
||||
auto res_mgr = ResMgrInst::GetInstance();
|
||||
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
|
||||
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
|
||||
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0));
|
||||
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
|
||||
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
|
||||
|
||||
@ -32,9 +32,11 @@ TEST(normal_test, test1) {
|
||||
|
||||
const uint64_t NUM_TASK = 1000;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
|
||||
for (uint64_t i = 0; i < NUM_TASK; ++i) {
|
||||
if (auto observe = disk.lock()) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
tasks.push_back(task);
|
||||
observe->task_table().Put(task);
|
||||
}
|
||||
|
@ -5,9 +5,9 @@
|
||||
using namespace zilliz::milvus::engine;
|
||||
|
||||
TEST(resource_factory_test, create) {
|
||||
auto disk = ResourceFactory::Create("disk");
|
||||
auto cpu = ResourceFactory::Create("cpu");
|
||||
auto gpu = ResourceFactory::Create("gpu");
|
||||
auto disk = ResourceFactory::Create("ssd", "DISK", 0);
|
||||
auto cpu = ResourceFactory::Create("cpu", "CPU", 0);
|
||||
auto gpu = ResourceFactory::Create("gpu", "GPU", 0);
|
||||
|
||||
ASSERT_TRUE(std::dynamic_pointer_cast<DiskResource>(disk));
|
||||
ASSERT_TRUE(std::dynamic_pointer_cast<CpuResource>(cpu));
|
||||
|
@ -22,9 +22,9 @@ class ResourceTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
disk_resource_ = ResourceFactory::Create("disk");
|
||||
cpu_resource_ = ResourceFactory::Create("cpu");
|
||||
gpu_resource_ = ResourceFactory::Create("gpu");
|
||||
disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0);
|
||||
cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0);
|
||||
gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0);
|
||||
resources_.push_back(disk_resource_);
|
||||
resources_.push_back(cpu_resource_);
|
||||
resources_.push_back(gpu_resource_);
|
||||
@ -85,8 +85,9 @@ protected:
|
||||
TEST_F(ResourceTest, cpu_resource_test) {
|
||||
const uint64_t NUM = 100;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
tasks.push_back(task);
|
||||
cpu_resource_->task_table().Put(task);
|
||||
}
|
||||
@ -113,8 +114,9 @@ TEST_F(ResourceTest, cpu_resource_test) {
|
||||
TEST_F(ResourceTest, gpu_resource_test) {
|
||||
const uint64_t NUM = 100;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
tasks.push_back(task);
|
||||
gpu_resource_->task_table().Put(task);
|
||||
}
|
||||
|
@ -3,14 +3,159 @@
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "scheduler/Scheduler.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <src/scheduler/tasklabel/DefaultLabel.h>
|
||||
#include "cache/DataObj.h"
|
||||
#include "cache/GpuCacheMgr.h"
|
||||
#include "scheduler/task/TestTask.h"
|
||||
#include "scheduler/ResourceFactory.h"
|
||||
#include "scheduler/resource/Resource.h"
|
||||
#include "utils/Error.h"
|
||||
#include "wrapper/knowhere/vec_index.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class MockVecIndex : public engine::VecIndex {
|
||||
public:
|
||||
virtual server::KnowhereError BuildAll(const long &nb,
|
||||
const float *xb,
|
||||
const long *ids,
|
||||
const engine::Config &cfg,
|
||||
const long &nt = 0,
|
||||
const float *xt = nullptr) {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr Clone() override {
|
||||
return zilliz::milvus::engine::VecIndexPtr();
|
||||
}
|
||||
|
||||
int64_t GetDeviceId() override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
engine::IndexType GetType() override {
|
||||
return engine::IndexType::INVALID;
|
||||
}
|
||||
|
||||
virtual server::KnowhereError Add(const long &nb,
|
||||
const float *xb,
|
||||
const long *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
|
||||
}
|
||||
|
||||
virtual server::KnowhereError Search(const long &nq,
|
||||
const float *xq,
|
||||
float *dist,
|
||||
long *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
|
||||
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
|
||||
|
||||
}
|
||||
|
||||
virtual int64_t Dimension() {
|
||||
return dimension_;
|
||||
}
|
||||
|
||||
virtual int64_t Count() {
|
||||
return ntotal_;
|
||||
}
|
||||
|
||||
virtual zilliz::knowhere::BinarySet Serialize() {
|
||||
zilliz::knowhere::BinarySet binset;
|
||||
return binset;
|
||||
}
|
||||
|
||||
virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
int64_t dimension_ = 512;
|
||||
int64_t ntotal_ = 0;
|
||||
};
|
||||
|
||||
|
||||
class SchedulerTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
|
||||
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
|
||||
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
|
||||
|
||||
res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
cpu_resource_ = res_mgr_->Add(std::move(cpu));
|
||||
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
|
||||
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
|
||||
|
||||
auto PCIE = Connection("IO", 11000.0);
|
||||
res_mgr_->Connect("cpu", "gpu0", PCIE);
|
||||
res_mgr_->Connect("cpu", "gpu1", PCIE);
|
||||
|
||||
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
|
||||
|
||||
res_mgr_->Start();
|
||||
scheduler_->Start();
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
scheduler_->Stop();
|
||||
res_mgr_->Stop();
|
||||
}
|
||||
|
||||
ResourceWPtr cpu_resource_;
|
||||
ResourceWPtr gpu_resource_0_;
|
||||
ResourceWPtr gpu_resource_1_;
|
||||
|
||||
ResourceMgrPtr res_mgr_;
|
||||
std::shared_ptr<Scheduler> scheduler_;
|
||||
uint64_t load_count_ = 0;
|
||||
std::mutex load_mutex_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
void
|
||||
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
||||
MockVecIndex* mock_index = new MockVecIndex();
|
||||
mock_index->ntotal_ = 1000;
|
||||
engine::VecIndexPtr index(mock_index);
|
||||
|
||||
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
|
||||
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, OnCopyCompleted) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
|
||||
dummy->location_ = "location";
|
||||
|
||||
insert_dummy_index_into_gpu_cache(1);
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
tasks.push_back(task);
|
||||
cpu_resource_.lock()->task_table().Put(task);
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -43,9 +43,10 @@ class TaskTableBaseTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
invalid_task_ = nullptr;
|
||||
task1_ = std::make_shared<TestTask>();
|
||||
task2_ = std::make_shared<TestTask>();
|
||||
task1_ = std::make_shared<TestTask>(dummy);
|
||||
task2_ = std::make_shared<TestTask>(dummy);
|
||||
}
|
||||
|
||||
TaskPtr invalid_task_;
|
||||
@ -83,8 +84,9 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < 8; ++i) {
|
||||
auto task = std::make_shared<TestTask>();
|
||||
auto task = std::make_shared<TestTask>(dummy);
|
||||
table1_.Put(task);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user