milvus/internal/rootcoord/task.go

820 lines
24 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rootcoord
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type reqTask interface {
Ctx() context.Context
Type() commonpb.MsgType
Execute(ctx context.Context) error
Core() *Core
}
type baseReqTask struct {
ctx context.Context
core *Core
}
func (b *baseReqTask) Core() *Core {
return b.core
}
func (b *baseReqTask) Ctx() context.Context {
return b.ctx
}
func executeTask(t reqTask) error {
errChan := make(chan error)
go func() {
err := t.Execute(t.Ctx())
errChan <- err
}()
select {
case <-t.Core().ctx.Done():
return fmt.Errorf("context canceled")
case <-t.Ctx().Done():
return fmt.Errorf("context canceled")
case err := <-errChan:
return err
}
}
type CreateCollectionReqTask struct {
baseReqTask
Req *milvuspb.CreateCollectionRequest
}
func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
const defaultShardsNum = 2
if t.Type() != commonpb.MsgType_CreateCollection {
return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var schema schemapb.CollectionSchema
err := proto.Unmarshal(t.Req.Schema, &schema)
if err != nil {
return err
}
if t.Req.CollectionName != schema.Name {
return fmt.Errorf("collection name = %s, schema.Name=%s", t.Req.CollectionName, schema.Name)
}
if t.Req.ShardsNum <= 0 {
log.Debug("Set ShardsNum to default", zap.String("collection name", t.Req.CollectionName),
zap.Int32("defaultShardsNum", defaultShardsNum))
t.Req.ShardsNum = defaultShardsNum
}
for idx, field := range schema.Fields {
field.FieldID = int64(idx + StartOfUserFieldID)
}
rowIDField := &schemapb.FieldSchema{
FieldID: int64(RowIDField),
Name: RowIDFieldName,
IsPrimaryKey: false,
Description: "row id",
DataType: schemapb.DataType_Int64,
}
timeStampField := &schemapb.FieldSchema{
FieldID: int64(TimeStampField),
Name: TimeStampFieldName,
IsPrimaryKey: false,
Description: "time stamp",
DataType: schemapb.DataType_Int64,
}
schema.Fields = append(schema.Fields, rowIDField, timeStampField)
collID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
collTs := t.Req.Base.Timestamp
partID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
log.Debug("collection name -> id",
zap.String("collection name", t.Req.CollectionName),
zap.Int64("colletion_id", collID),
zap.Int64("default partition id", partID))
vchanNames := make([]string, t.Req.ShardsNum)
chanNames := make([]string, t.Req.ShardsNum)
for i := int32(0); i < t.Req.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%d_%d_v", t.Req.CollectionName, collID, i)
chanNames[i] = ToPhysicalChannel(vchanNames[i])
}
collInfo := etcdpb.CollectionInfo{
ID: collID,
Schema: &schema,
CreateTime: collTs,
PartitionIDs: make([]typeutil.UniqueID, 0, 16),
FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
}
idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
/////////////////////// ignore index param from create_collection /////////////////////////
//for _, field := range schema.Fields {
// if field.DataType == schemapb.DataType_VectorFloat || field.DataType == schemapb.DataType_VectorBinary {
// if len(field.IndexParams) > 0 {
// idxID, err := t.core.idAllocator.AllocOne()
// if err != nil {
// return err
// }
// filedIdx := &etcdpb.FieldIndexInfo{
// FiledID: field.FieldID,
// IndexID: idxID,
// }
// idx := &etcdpb.IndexInfo{
// IndexName: fmt.Sprintf("%s_index_%d", collMeta.Schema.Name, field.FieldID),
// IndexID: idxID,
// IndexParams: field.IndexParams,
// }
// idxInfo = append(idxInfo, idx)
// collMeta.FieldIndexes = append(collMeta.FieldIndexes, filedIdx)
// }
// }
//}
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
schemaBytes, err := proto.Marshal(&schema)
if err != nil {
return err
}
ddCollReq := internalpb.CreateCollectionRequest{
Base: t.Req.Base,
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
DbID: 0, //TODO,not used
CollectionID: collID,
Schema: schemaBytes,
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
}
ddPartReq := internalpb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: t.Req.Base.MsgID, //TODO, msg id
Timestamp: t.Req.Base.Timestamp + 1,
SourceID: t.Req.Base.SourceID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
PartitionName: Params.DefaultPartitionName,
DbID: 0, //TODO, not used
CollectionID: collInfo.ID,
PartitionID: partID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddCollReq.Base.Timestamp = ts
ddPartReq.Base.Timestamp = ts
return EncodeDdOperation(&ddCollReq, &ddPartReq, CreateCollectionDDType)
}
ts, err := t.core.MetaTable.AddCollection(&collInfo, partID, Params.DefaultPartitionName, idxInfo, ddOp)
if err != nil {
return err
}
// add dml channel before send dd msg
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
t.core.dmlChannels.AddProducerChannels(chanNames...)
err = t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddPartReq, chanNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type DropCollectionReqTask struct {
baseReqTask
Req *milvuspb.DropCollectionRequest
}
func (t *DropCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropCollection {
return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
ddReq := internalpb.DropCollectionRequest{
Base: t.Req.Base,
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
DbID: 0, //not used
CollectionID: collMeta.ID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, nil, DropCollectionDDType)
}
ts, err := t.core.MetaTable.DeleteCollection(collMeta.ID, ddOp)
if err != nil {
return err
}
err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
// remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
//notify query service to release collection
go func() {
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil {
log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error()))
}
}()
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type HasCollectionReqTask struct {
baseReqTask
Req *milvuspb.HasCollectionRequest
HasCollection bool
}
func (t *HasCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasCollection {
return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, t.Req.TimeStamp)
if err == nil {
t.HasCollection = true
} else {
t.HasCollection = false
}
return nil
}
type DescribeCollectionReqTask struct {
baseReqTask
Req *milvuspb.DescribeCollectionRequest
Rsp *milvuspb.DescribeCollectionResponse
}
func (t *DescribeCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeCollection {
return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var collInfo *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName != "" {
collInfo, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, t.Req.TimeStamp)
if err != nil {
return err
}
} else {
collInfo, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, t.Req.TimeStamp)
if err != nil {
return err
}
}
t.Rsp.Schema = proto.Clone(collInfo.Schema).(*schemapb.CollectionSchema)
t.Rsp.CollectionID = collInfo.ID
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
//var newField []*schemapb.FieldSchema
//for _, field := range t.Rsp.Schema.Fields {
// if field.FieldID >= StartOfUserFieldID {
// newField = append(newField, field)
// }
//}
//t.Rsp.Schema.Fields = newField
t.Rsp.VirtualChannelNames = collInfo.VirtualChannelNames
t.Rsp.PhysicalChannelNames = collInfo.PhysicalChannelNames
return nil
}
type ShowCollectionReqTask struct {
baseReqTask
Req *milvuspb.ShowCollectionsRequest
Rsp *milvuspb.ShowCollectionsResponse
}
func (t *ShowCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowCollections {
return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.ListCollections(t.Req.TimeStamp)
if err != nil {
return err
}
for name, id := range coll {
t.Rsp.CollectionNames = append(t.Rsp.CollectionNames, name)
t.Rsp.CollectionIds = append(t.Rsp.CollectionIds, id)
}
return nil
}
type CreatePartitionReqTask struct {
baseReqTask
Req *milvuspb.CreatePartitionRequest
}
func (t *CreatePartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreatePartition {
return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
partID, _, err := t.core.IDAllocator(1)
if err != nil {
return err
}
ddReq := internalpb.CreatePartitionRequest{
Base: t.Req.Base,
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
PartitionName: t.Req.PartitionName,
DbID: 0, // todo, not used
CollectionID: collMeta.ID,
PartitionID: partID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, nil, CreatePartitionDDType)
}
ts, err := t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOp)
if err != nil {
return err
}
err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type DropPartitionReqTask struct {
baseReqTask
Req *milvuspb.DropPartitionRequest
}
func (t *DropPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
partID, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName, 0)
if err != nil {
return err
}
ddReq := internalpb.DropPartitionRequest{
Base: t.Req.Base,
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
PartitionName: t.Req.PartitionName,
DbID: 0, //todo,not used
CollectionID: collInfo.ID,
PartitionID: partID,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func(ts typeutil.Timestamp) (string, error) {
ddReq.Base.Timestamp = ts
return EncodeDdOperation(&ddReq, nil, DropPartitionDDType)
}
ts, _, err := t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOp)
if err != nil {
return err
}
err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames)
if err != nil {
return err
}
t.core.SendTimeTick(ts)
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
//notify query service to release partition
go func() {
if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil {
log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error()))
}
}()
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
type HasPartitionReqTask struct {
baseReqTask
Req *milvuspb.HasPartitionRequest
HasPartition bool
}
func (t *HasPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasPartition {
return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName, 0)
return nil
}
type ShowPartitionReqTask struct {
baseReqTask
Req *milvuspb.ShowPartitionsRequest
Rsp *milvuspb.ShowPartitionsResponse
}
func (t *ShowPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowPartitions {
return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var coll *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName == "" {
coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
} else {
coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
}
if err != nil {
return err
}
t.Rsp.PartitionIDs = coll.PartitionIDs
t.Rsp.PartitionNames = coll.PartitonNames
return nil
}
type DescribeSegmentReqTask struct {
baseReqTask
Req *milvuspb.DescribeSegmentRequest
Rsp *milvuspb.DescribeSegmentResponse //TODO,return repeated segment id in the future
}
func (t *DescribeSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeSegment {
return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
if err != nil {
return err
}
exist := false
segIDs, err := t.core.CallGetFlushedSegmentsService(ctx, t.Req.CollectionID, -1)
if err != nil {
log.Debug("get flushed segment from data coord failed", zap.String("collection_name", coll.Schema.Name), zap.Error(err))
exist = true
} else {
for _, id := range segIDs {
if id == t.Req.SegmentID {
exist = true
break
}
}
}
if !exist {
return fmt.Errorf("segment id %d not belong to collection id %d", t.Req.SegmentID, t.Req.CollectionID)
}
//TODO, get filed_id and index_name from request
segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
log.Debug("RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID),
zap.Any("segIdxInfo", segIdxInfo), zap.Error(err))
if err != nil {
return err
}
t.Rsp.IndexID = segIdxInfo.IndexID
t.Rsp.BuildID = segIdxInfo.BuildID
t.Rsp.EnableIndex = segIdxInfo.EnableIndex
return nil
}
type ShowSegmentReqTask struct {
baseReqTask
Req *milvuspb.ShowSegmentsRequest
Rsp *milvuspb.ShowSegmentsResponse
}
func (t *ShowSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowSegments {
return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
if err != nil {
return err
}
exist := false
for _, partID := range coll.PartitionIDs {
if partID == t.Req.PartitionID {
exist = true
break
}
}
if !exist {
return fmt.Errorf("partition id = %d not belong to collection id = %d", t.Req.PartitionID, t.Req.CollectionID)
}
segIDs, err := t.core.CallGetFlushedSegmentsService(ctx, t.Req.CollectionID, t.Req.PartitionID)
if err != nil {
log.Debug("get flushed segments from data coord failed", zap.String("collection name", coll.Schema.Name), zap.Int64("partition id", t.Req.PartitionID), zap.Error(err))
return err
}
t.Rsp.SegmentIDs = append(t.Rsp.SegmentIDs, segIDs...)
return nil
}
type CreateIndexReqTask struct {
baseReqTask
Req *milvuspb.CreateIndexRequest
}
func (t *CreateIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateIndex {
return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, _, err := t.core.IDAllocator(1)
log.Debug("RootCoord CreateIndexReqTask", zap.Any("indexID", indexID), zap.Error(err))
if err != nil {
return err
}
idxInfo := &etcdpb.IndexInfo{
IndexName: indexName,
IndexID: indexID,
IndexParams: t.Req.ExtraParams,
}
collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0)
if err != nil {
return err
}
segID2PartID, err := t.core.getSegments(ctx, collMeta.ID)
flushedSegs := make([]typeutil.UniqueID, 0, len(segID2PartID))
for k := range segID2PartID {
flushedSegs = append(flushedSegs, k)
}
if err != nil {
log.Debug("get flushed segments from data coord failed", zap.String("collection_name", collMeta.Schema.Name), zap.Error(err))
return err
}
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo, flushedSegs)
if err != nil {
log.Debug("RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
return err
}
if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector {
return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)])
}
for _, segID := range segIDs {
info := etcdpb.SegmentIndexInfo{
CollectionID: collMeta.ID,
PartitionID: segID2PartID[segID],
SegmentID: segID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
info.BuildID, err = t.core.BuildIndex(ctx, segID, &field, idxInfo, false)
if err != nil {
return err
}
if info.BuildID != 0 {
info.EnableIndex = true
}
if _, err := t.core.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
}
}
return nil
}
type DescribeIndexReqTask struct {
baseReqTask
Req *milvuspb.DescribeIndexRequest
Rsp *milvuspb.DescribeIndexResponse
}
func (t *DescribeIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeIndex {
return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
coll, idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.IndexName)
if err != nil {
return err
}
for _, i := range idx {
f, err := GetFieldSchemaByIndexID(&coll, typeutil.UniqueID(i.IndexID))
if err != nil {
log.Warn("get field schema by index id failed", zap.String("collection name", t.Req.CollectionName), zap.String("index name", t.Req.IndexName), zap.Error(err))
continue
}
desc := &milvuspb.IndexDescription{
IndexName: i.IndexName,
Params: i.IndexParams,
IndexID: i.IndexID,
FieldName: f.Name,
}
t.Rsp.IndexDescriptions = append(t.Rsp.IndexDescriptions, desc)
}
return nil
}
type DropIndexReqTask struct {
baseReqTask
Req *milvuspb.DropIndexRequest
}
func (t *DropIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropIndex {
return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
_, info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.IndexName)
if err != nil {
log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err))
return err
}
if len(info) == 0 {
return nil
}
if len(info) != 1 {
return fmt.Errorf("len(index) = %d", len(info))
}
err = t.core.CallDropIndexService(ctx, info[0].IndexID)
if err != nil {
return err
}
_, _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
return err
}