Add etcd, metaTable and scheduler in master

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2020-10-29 12:39:41 +08:00 committed by yefu.chen
parent bbecf51f8c
commit 9131a5554a
7 changed files with 821 additions and 156 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"log"
"strconv"
)
@ -15,14 +16,33 @@ const collectionMetaPrefix = "collection/"
type createCollectionTask struct {
baseTask
req *internalpb.CreateCollectionRequest
req *internalpb.CreateCollectionRequest
}
type dropCollectionTask struct {
baseTask
req *internalpb.DropCollectionRequest
req *internalpb.DropCollectionRequest
}
type hasCollectionTask struct {
baseTask
hasCollection bool
req *internalpb.HasCollectionRequest
}
type describeCollectionTask struct {
baseTask
description *servicepb.CollectionDescription
req *internalpb.DescribeCollectionRequest
}
type showCollectionsTask struct {
baseTask
stringListResponse *servicepb.StringListResponse
req *internalpb.ShowCollectionRequest
}
//////////////////////////////////////////////////////////////////////////
func (t *createCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
@ -38,15 +58,17 @@ func (t *createCollectionTask) Ts() (Timestamp, error) {
return Timestamp(t.req.Timestamp), nil
}
func (t *createCollectionTask) Execute() commonpb.Status {
func (t *createCollectionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
var schema schemapb.CollectionSchema
err0 := json.Unmarshal(t.req.Schema.Value, &schema)
if err0 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Unmarshal CollectionSchema failed",
}
err := json.Unmarshal(t.req.Schema.Value, &schema)
if err != nil {
_ = t.Notify()
return errors.New("unmarshal CollectionSchema failed")
}
// TODO: allocate collection id
@ -64,30 +86,25 @@ func (t *createCollectionTask) Execute() commonpb.Status {
PartitionTags: make([]string, 0),
}
collectionJson, err1 := json.Marshal(&collection)
if err1 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Marshal collection failed",
}
collectionJson, err := json.Marshal(&collection)
if err != nil {
_ = t.Notify()
return errors.New("marshal collection failed")
}
err2 := (*t.kvBase).Save(collectionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson))
if err2 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Save collection failed",
}
err = (*t.kvBase).Save(collectionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson))
if err != nil {
_ = t.Notify()
return errors.New("save collection failed")
}
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
t.mt.collMeta[collectionId] = collection
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *dropCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
@ -103,8 +120,143 @@ func (t *dropCollectionTask) Ts() (Timestamp, error) {
return Timestamp(t.req.Timestamp), nil
}
func (t *dropCollectionTask) Execute() commonpb.Status {
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
func (t *dropCollectionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
collectionName := t.req.CollectionName.CollectionName
collectionMeta, err := t.mt.GetCollectionByName(collectionName)
if err != nil {
_ = t.Notify()
return err
}
collectionId := collectionMeta.Id
err = (*t.kvBase).Remove(collectionMetaPrefix + strconv.FormatUint(collectionId, 10))
if err != nil {
_ = t.Notify()
return errors.New("save collection failed")
}
delete(t.mt.collMeta, collectionId)
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *hasCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *hasCollectionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *hasCollectionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
collectionName := t.req.CollectionName.CollectionName
_, err := t.mt.GetCollectionByName(collectionName)
if err == nil {
t.hasCollection = true
}
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *describeCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *describeCollectionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *describeCollectionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
collectionName := t.req.CollectionName
collection, err := t.mt.GetCollectionByName(collectionName.CollectionName)
if err != nil {
_ = t.Notify()
return err
}
description := servicepb.CollectionDescription{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Schema: collection.Schema,
}
t.description = &description
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *showCollectionsTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *showCollectionsTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *showCollectionsTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
collections := make([]string, 0)
for _, collection := range t.mt.collMeta {
collections = append(collections, collection.Schema.Name)
}
stringListResponse := servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: collections,
}
t.stringListResponse = &stringListResponse
_ = t.Notify()
return nil
}

View File

@ -15,23 +15,20 @@ import (
"time"
)
const slowThreshold = 5 * time.Millisecond
func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) {
func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) {
var t task = &createCollectionTask{
req: in,
baseTask: baseTask{
kvBase: &ms.kvBase,
mt: &ms.mt,
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}
var status = ms.scheduler.Enqueue(&t)
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@ -39,8 +36,8 @@ func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl
}, err
}
status = t.WaitToFinish(ctx)
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@ -48,91 +45,306 @@ func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl
}, err
}
return &status, nil
}
func (ms Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (ms Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) {
func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) {
var t task = &dropCollectionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
}, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
}, err
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) {
var t task = &hasCollectionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
hasCollection: false,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
},
Value: t.(*hasCollectionTask).hasCollection,
}, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
},
Value: t.(*hasCollectionTask).hasCollection,
}, err
}
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: true,
Value: t.(*hasCollectionTask).hasCollection,
}, nil
}
func (ms Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) {
return &servicepb.CollectionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) {
var t task = &describeCollectionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}, nil
description: nil,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return t.(*describeCollectionTask).description, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return t.(*describeCollectionTask).description, err
}
return t.(*describeCollectionTask).description, nil
}
func (ms Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) {
var t task = &showCollectionsTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}, nil
stringListResponse: nil,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return t.(*showCollectionsTask).stringListResponse, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return t.(*showCollectionsTask).stringListResponse, err
}
return t.(*showCollectionsTask).stringListResponse, nil
}
func (ms Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) {
//////////////////////////////////////////////////////////////////////////
func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) {
var t task = &createPartitionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
}, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
}, err
}
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (ms Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) {
func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) {
var t task = &dropPartitionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
}, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
}, err
}
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (ms Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) {
func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) {
var t task = &hasPartitionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
hasPartition: false,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
},
Value: t.(*hasPartitionTask).hasPartition,
}, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
},
Value: t.(*hasPartitionTask).hasPartition,
}, err
}
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: true,
Value: t.(*hasPartitionTask).hasPartition,
}, nil
}
func (ms Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) {
var t task = &describePartitionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}, nil
description: nil,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return t.(*describePartitionTask).description, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return t.(*describePartitionTask).description, err
}
return t.(*describePartitionTask).description, nil
}
func (ms Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) {
var t task = &showPartitionTask{
req: in,
baseTask: baseTask{
kvBase: &s.kvBase,
mt: &s.mt,
cv: make(chan int),
},
}, nil
stringListResponse: nil,
}
var err = s.scheduler.Enqueue(&t)
if err != nil {
err := errors.New("Enqueue failed")
return t.(*showPartitionTask).stringListResponse, err
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return t.(*showPartitionTask).stringListResponse, err
}
return t.(*showPartitionTask).stringListResponse, nil
}
//----------------------------------------Internal GRPC Service--------------------------------
// Tso implements gRPC PDServer.
func (ms *Master) Tso(stream masterpb.Master_TsoServer) error {
func (s *Master) Tso(stream masterpb.Master_TsoServer) error {
for {
request, err := stream.Recv()
if err == io.EOF {
@ -144,7 +356,7 @@ func (ms *Master) Tso(stream masterpb.Master_TsoServer) error {
start := time.Now()
count := request.GetCount()
ts, err := ms.tsoAllocator.GenerateTSO(count)
ts, err := s.tsoAllocator.GenerateTSO(count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
@ -162,4 +374,4 @@ func (ms *Master) Tso(stream masterpb.Master_TsoServer) error {
return errors.WithStack(err)
}
}
}
}

View File

@ -16,19 +16,19 @@ package master
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/apache/pulsar-client-go/pulsar"
"google.golang.org/grpc"
"log"
"math/rand"
"net"
"strconv"
"github.com/golang/protobuf/proto"
"sync"
"sync/atomic"
"time"
@ -37,7 +37,6 @@ import (
"go.etcd.io/etcd/clientv3"
)
// Server is the pd server.
type Master struct {
// Server state.
@ -52,7 +51,7 @@ type Master struct {
serverLoopWg sync.WaitGroup
//grpc server
grpcServer * grpc.Server
grpcServer *grpc.Server
// for tso.
tsoAllocator tso.Allocator
@ -63,9 +62,9 @@ type Master struct {
// chans
ssChan chan internalpb.SegmentStatistics
kvBase kv.Base
scheduler *ddRequestScheduler
mt metaTable
kvBase kv.Base
scheduler *ddRequestScheduler
mt metaTable
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -90,9 +89,9 @@ func CreateServer(ctx context.Context) (*Master, error) {
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKvBase(),
ssChan: make(chan internalpb.SegmentStatistics, 10),
pc : informer.NewPulsarClient(),
kvBase: newKvBase(),
ssChan: make(chan internalpb.SegmentStatistics, 10),
pc: informer.NewPulsarClient(),
}
m.grpcServer = grpc.NewServer()
@ -117,13 +116,11 @@ func (s *Master) startServer(ctx context.Context) error {
return nil
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Master) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}
// Close closes the server.
func (s *Master) Close() {
if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
@ -152,7 +149,6 @@ func (s *Master) IsClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
}
// Run runs the pd server.
func (s *Master) Run() error {
@ -189,13 +185,11 @@ func (s *Master) stopServerLoop() {
s.serverLoopWg.Wait()
}
// StartTimestamp returns the start timestamp of this server
func (s *Master) StartTimestamp() int64 {
return s.startTimestamp
}
func (s *Master) grpcLoop() {
defer s.serverLoopWg.Done()
@ -236,7 +230,7 @@ func (s *Master) pulsarLoop() {
log.Fatal(err)
return
}
defer func (){
defer func() {
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
@ -259,23 +253,49 @@ func (s *Master) pulsarLoop() {
return
}
}
}
func (s *Master) tasksExecutionLoop() {
defer s.serverLoopWg.Done()
ctx, _ := context.WithCancel(s.serverLoopCtx)
for {
select {
case task := <-s.scheduler.reqQueue:
timeStamp, err := (*task).Ts()
if err != nil {
log.Println(err)
} else {
if timeStamp < s.scheduler.scheduleTimeStamp {
_ = (*task).NotifyTimeout()
} else {
s.scheduler.scheduleTimeStamp = timeStamp
err := (*task).Execute()
if err != nil {
log.Println("request execution failed caused by error:", err)
}
}
}
case <-ctx.Done():
log.Print("server is closed, exit task execution loop")
return
}
}
}
func (s *Master) segmentStatisticsLoop() {
defer s.serverLoopWg.Done()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case ss := <-s.ssChan:
controller.ComputeCloseTime(ss, s.kvBase)
case <-ctx.Done():
log.Print("server is closed, exit etcd leader loop")
return
}
for {
select {
case ss := <-s.ssChan:
controller.ComputeCloseTime(ss, s.kvBase)
case <-ctx.Done():
log.Print("server is closed, exit etcd leader loop")
return
}
}
}

View File

@ -7,20 +7,61 @@ import (
)
type metaTable struct {
client kv.Base // client of a reliable kv service, i.e. etcd client
rootPath string // this metaTable's working root path on the reliable kv service
tenantMeta map[int64]etcdpb.TenantMeta // tenant id to tenant meta
proxyMeta map[int64]etcdpb.ProxyMeta // proxy id to proxy meta
collMeta map[int64]etcdpb.CollectionMeta // collection id to collection meta
segMeta map[int64]etcdpb.SegmentMeta // segment id to segment meta
client kv.Base // client of a reliable kv service, i.e. etcd client
rootPath string // this metaTable's working root path on the reliable kv service
tenantMeta map[int64]etcdpb.TenantMeta // tenant id to tenant meta
proxyMeta map[int64]etcdpb.ProxyMeta // proxy id to proxy meta
collMeta map[uint64]etcdpb.CollectionMeta // collection id to collection meta
segMeta map[int64]etcdpb.SegmentMeta // segment id to segment meta
}
func (mt *metaTable) getCollectionMetaByName(name string) (*etcdpb.CollectionMeta, error) {
func (mt *metaTable) GetCollectionByName(collectionName string) (*etcdpb.CollectionMeta, error) {
for _, v := range mt.collMeta {
if v.Schema.Name == name {
if v.Schema.Name == collectionName {
return &v, nil
}
}
return nil, errors.New("Cannot found collection: " + name)
return nil, errors.New("Cannot found collection: " + collectionName)
}
func (mt *metaTable) HasPartition(partitionTag, collectionName string) bool {
var hasPartition = false
for _, v := range mt.collMeta {
if v.Schema.Name == collectionName {
for _, tag := range v.PartitionTags {
if tag == partitionTag {
hasPartition = true
}
}
}
}
return hasPartition
}
func (mt *metaTable) DeletePartition(partitionTag, collectionName string) error {
var tmpPartitionTags = make([]string, 0)
var hasPartition = false
for _, v := range mt.collMeta {
if v.Schema.Name == collectionName {
for _, tag := range v.PartitionTags {
if tag == partitionTag {
hasPartition = true
} else {
tmpPartitionTags = append(tmpPartitionTags, tag)
}
}
if !hasPartition {
return errors.New("Cannot found partition: " + partitionTag + " in collection: " + collectionName)
} else {
v.PartitionTags = tmpPartitionTags
return nil
}
}
}
return errors.New("Cannot found collection: " + collectionName)
}

View File

@ -0,0 +1,251 @@
package master
import (
"encoding/json"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"log"
"strconv"
)
const partitionMetaPrefix = "partition/"
type createPartitionTask struct {
baseTask
req *internalpb.CreatePartitionRequest
}
type dropPartitionTask struct {
baseTask
req *internalpb.DropPartitionRequest
}
type hasPartitionTask struct {
baseTask
hasPartition bool
req *internalpb.HasPartitionRequest
}
type describePartitionTask struct {
baseTask
description *servicepb.PartitionDescription
req *internalpb.DescribePartitionRequest
}
type showPartitionTask struct {
baseTask
stringListResponse *servicepb.StringListResponse
req *internalpb.ShowPartitionRequest
}
//////////////////////////////////////////////////////////////////////////
func (t *createPartitionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *createPartitionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *createPartitionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
partitionName := t.req.PartitionName
collectionName := partitionName.CollectionName
collectionMeta, err := t.mt.GetCollectionByName(collectionName)
if err != nil {
_ = t.Notify()
return err
}
collectionMeta.PartitionTags = append(collectionMeta.PartitionTags, partitionName.Tag)
collectionJson, err := json.Marshal(&collectionMeta)
if err != nil {
_ = t.Notify()
return errors.New("marshal collection failed")
}
collectionId := collectionMeta.Id
err = (*t.kvBase).Save(partitionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson))
if err != nil {
_ = t.Notify()
return errors.New("save collection failed")
}
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *dropPartitionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *dropPartitionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *dropPartitionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
partitionName := t.req.PartitionName
collectionName := partitionName.CollectionName
collectionMeta, err := t.mt.GetCollectionByName(collectionName)
if err != nil {
_ = t.Notify()
return err
}
err = t.mt.DeletePartition(partitionName.Tag, collectionName)
if err != nil {
return err
}
collectionJson, err := json.Marshal(&collectionMeta)
if err != nil {
_ = t.Notify()
return errors.New("marshal collection failed")
}
collectionId := collectionMeta.Id
err = (*t.kvBase).Save(partitionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson))
if err != nil {
_ = t.Notify()
return errors.New("save collection failed")
}
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *hasPartitionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *hasPartitionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *hasPartitionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
partitionName := t.req.PartitionName
collectionName := partitionName.CollectionName
t.hasPartition = t.mt.HasPartition(partitionName.Tag, collectionName)
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *describePartitionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *describePartitionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *describePartitionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
partitionName := t.req.PartitionName
description := servicepb.PartitionDescription {
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Name: partitionName,
}
t.description = &description
_ = t.Notify()
return nil
}
//////////////////////////////////////////////////////////////////////////
func (t *showPartitionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *showPartitionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *showPartitionTask) Execute() error {
if t.req == nil {
_ = t.Notify()
return errors.New("null request")
}
partitions := make([]string, 0)
for _, collection := range t.mt.collMeta {
for _, partition := range collection.PartitionTags {
partitions = append(partitions, partition)
}
}
stringListResponse := servicepb.StringListResponse {
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: partitions,
}
t.stringListResponse = &stringListResponse
_ = t.Notify()
return nil
}

View File

@ -1,11 +1,8 @@
package master
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type ddRequestScheduler struct {
reqQueue chan *task
reqQueue chan *task
scheduleTimeStamp Timestamp
}
func NewDDRequestScheduler() *ddRequestScheduler {
@ -17,14 +14,7 @@ func NewDDRequestScheduler() *ddRequestScheduler {
return &rs
}
func (rs *ddRequestScheduler) Enqueue(task *task) commonpb.Status {
func (rs *ddRequestScheduler) Enqueue(task *task) error {
rs.reqQueue <- task
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
func (rs *ddRequestScheduler) schedule() *task {
t := <- rs.reqQueue
return t
return nil
}

View File

@ -2,8 +2,8 @@ package master
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
@ -19,30 +19,29 @@ type baseTask struct {
type task interface {
Type() internalpb.ReqType
Ts() (Timestamp, error)
Execute() commonpb.Status
WaitToFinish(ctx context.Context) commonpb.Status
Notify() commonpb.Status
Execute() error
WaitToFinish(ctx context.Context) error
Notify() error
NotifyTimeout() error
}
func (bt *baseTask) Notify() commonpb.Status {
func (bt *baseTask) Notify() error {
bt.cv <- 0
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return nil
}
func (bt *baseTask) WaitToFinish(ctx context.Context) commonpb.Status {
func (bt *baseTask) NotifyTimeout() error {
bt.cv <- 0
return errors.New("request timeout")
}
func (bt *baseTask) WaitToFinish(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return commonpb.Status{
// TODO: if to return unexpected error
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
}
return nil
case <-bt.cv:
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return nil
}
}
}