Revert "Use unified catalog interface to reconstruct metastore (#17042)" (#17480)

This reverts commit 218326bafe.

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-06-10 13:10:08 +08:00 committed by GitHub
parent 2d9a52206d
commit b64a467311
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1434 additions and 2157 deletions

View File

@ -627,6 +627,7 @@ _proxyMetaBlob_, _collectionInfoBlob_, _partitionInfoBlob_, _IndexInfoBlob_, _se
type metaTable struct {
txn kv.TxnKV // client of a reliable txnkv service, i.e. etcd client
snapshot kv.SnapShotKV // client of a reliable snapshotkv service, i.e. etcd client
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta
collName2ID map[string]typeutil.UniqueID // collection name to collection id
collAlias2ID map[string]typeutil.UniqueID // collection alias to collection id
@ -640,6 +641,7 @@ type metaTable struct {
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error)
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error)
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error)
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool
@ -690,6 +692,7 @@ type timetickSync struct {
func newTimeTickSync(core *Core) *timetickSync
func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error
func (t *timetickSync) AddProxy(sess *sessionutil.Session)
func (t *timetickSync) DelProxy(sess *sessionutil.Session)
func (t *timetickSync) GetProxy(sess []*sessionutil.Session)
func (t *timetickSync) StartWatch()

1
go.mod
View File

@ -20,7 +20,6 @@ require (
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/gin-gonic/gin v1.7.7
github.com/go-basic/ipv4 v1.0.0
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/gofrs/flock v0.8.1
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/golang/mock v1.5.0

2
go.sum
View File

@ -235,8 +235,6 @@ github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7a
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.7.10 h1:ulhbuNe1JqE68nMRXXTJRrUu0uhouf0VevLINxQq4Ec=
github.com/goccy/go-json v0.7.10/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=

View File

@ -26,8 +26,6 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/metastore/model"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/golang/protobuf/proto"
@ -36,6 +34,7 @@ import (
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -193,7 +192,7 @@ func TestGrpcService(t *testing.T) {
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.CallBuildIndexService = func(ctx context.Context, segID typeutil.UniqueID, binlog []string, field *model.Field, idxInfo *model.Index, numRows int64) (typeutil.UniqueID, error) {
core.CallBuildIndexService = func(ctx context.Context, segID typeutil.UniqueID, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)
@ -474,7 +473,7 @@ func TestGrpcService(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, collName, rsp.Schema.Name)
assert.Equal(t, collMeta.CollectionID, rsp.CollectionID)
assert.Equal(t, collMeta.ID, rsp.CollectionID)
})
t.Run("show collection", func(t *testing.T) {
@ -511,8 +510,8 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 2, len(collMeta.Partitions))
partName2, err := core.MetaTable.GetPartitionNameByID(collMeta.CollectionID, collMeta.Partitions[1].PartitionID, 0)
assert.Equal(t, 2, len(collMeta.PartitionIDs))
partName2, err := core.MetaTable.GetPartitionNameByID(collMeta.ID, collMeta.PartitionIDs[1], 0)
assert.Nil(t, err)
assert.Equal(t, partName, partName2)
assert.Equal(t, 1, len(collectionMetaCache))
@ -548,7 +547,7 @@ func TestGrpcService(t *testing.T) {
},
DbName: "testDb",
CollectionName: collName,
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
}
rsp, err := cli.ShowPartitions(ctx, req)
assert.Nil(t, err)
@ -560,8 +559,8 @@ func TestGrpcService(t *testing.T) {
t.Run("show segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
partID := coll.Partitions[1].PartitionID
_, err = core.MetaTable.GetPartitionNameByID(coll.CollectionID, partID, 0)
partID := coll.PartitionIDs[1]
_, err = core.MetaTable.GetPartitionNameByID(coll.ID, partID, 0)
assert.Nil(t, err)
segLock.Lock()
@ -575,7 +574,7 @@ func TestGrpcService(t *testing.T) {
Timestamp: 170,
SourceID: 170,
},
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
PartitionID: partID,
}
rsp, err := cli.ShowSegments(ctx, req)
@ -635,7 +634,7 @@ func TestGrpcService(t *testing.T) {
Timestamp: 190,
SourceID: 190,
},
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
SegmentID: 1000,
}
rsp, err := cli.DescribeSegment(ctx, req)
@ -667,8 +666,8 @@ func TestGrpcService(t *testing.T) {
t.Run("flush segment", func(t *testing.T) {
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
partID := coll.Partitions[1].PartitionID
_, err = core.MetaTable.GetPartitionNameByID(coll.CollectionID, partID, 0)
partID := coll.PartitionIDs[1]
_, err = core.MetaTable.GetPartitionNameByID(coll.ID, partID, 0)
assert.Nil(t, err)
segLock.Lock()
@ -681,7 +680,7 @@ func TestGrpcService(t *testing.T) {
},
Segment: &datapb.SegmentInfo{
ID: segID,
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
PartitionID: partID,
},
}
@ -756,8 +755,8 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, 1, len(collMeta.Partitions))
partName, err := core.MetaTable.GetPartitionNameByID(collMeta.CollectionID, collMeta.Partitions[0].PartitionID, 0)
assert.Equal(t, 1, len(collMeta.PartitionIDs))
partName, err := core.MetaTable.GetPartitionNameByID(collMeta.ID, collMeta.PartitionIDs[0], 0)
assert.Nil(t, err)
assert.Equal(t, rootcoord.Params.CommonCfg.DefaultPartitionName, partName)
assert.Equal(t, 2, len(collectionMetaCache))

View File

@ -1,37 +0,0 @@
package metastore
import (
"context"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Catalog interface {
CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error)
GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error)
ListCollections(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Collection, error)
CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool
DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error
DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error
CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error
AlterIndex(ctx context.Context, oldIndex *model.Index, newIndex *model.Index) error
DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error
ListIndexes(ctx context.Context) ([]*model.Index, error)
CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error
DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error
AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error
ListAliases(ctx context.Context) ([]*model.Collection, error)
GetCredential(ctx context.Context, username string) (*model.Credential, error)
CreateCredential(ctx context.Context, credential *model.Credential) error
DropCredential(ctx context.Context, username string) error
ListCredentials(ctx context.Context) ([]string, error)
Close()
}

View File

@ -1,24 +0,0 @@
package kv
const (
// ComponentPrefix prefix for rootcoord component
ComponentPrefix = "root-coord"
// CollectionMetaPrefix prefix for collection meta
CollectionMetaPrefix = ComponentPrefix + "/collection"
// SegmentIndexMetaPrefix prefix for segment index meta
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
// IndexMetaPrefix prefix for index meta
IndexMetaPrefix = ComponentPrefix + "/index"
// CollectionAliasMetaPrefix prefix for collection alias meta
CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
// UserSubPrefix subpath for credential user
UserSubPrefix = "/credential/users"
// CredentialPrefix prefix for credential user
CredentialPrefix = ComponentPrefix + UserSubPrefix
)

View File

@ -1,529 +0,0 @@
package kv
import (
"bytes"
"context"
"encoding/json"
"fmt"
"path"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type Catalog struct {
Txn kv.TxnKV
Snapshot kv.SnapShotKV
}
func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID)
collInfo := model.ConvertToCollectionPB(coll)
v1, err := proto.Marshal(collInfo)
if err != nil {
log.Error("create collection marshal fail", zap.String("key", k1), zap.Error(err))
return err
}
// save ddOpStr into etcd
kvs := map[string]string{k1: string(v1)}
for k, v := range coll.Extra {
kvs[k] = v
}
err = kc.Snapshot.MultiSave(kvs, ts)
if err != nil {
log.Error("create collection persist meta fail", zap.String("key", k1), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.CollectionID)
collInfo := model.ConvertToCollectionPB(coll)
v1, err := proto.Marshal(collInfo)
if err != nil {
log.Error("create partition marshal fail", zap.String("key", k1), zap.Error(err))
return err
}
kvs := map[string]string{k1: string(v1)}
err = kc.Snapshot.MultiSave(kvs, ts)
if err != nil {
log.Error("create partition persist meta fail", zap.String("key", k1), zap.Error(err))
return err
}
// save ddOpStr into etcd
err = kc.Txn.MultiSave(coll.Extra)
if err != nil {
// will not panic, missing create msg
log.Warn("create partition persist ddop meta fail", zap.Int64("collectionID", coll.CollectionID), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) CreateIndex(ctx context.Context, col *model.Collection, index *model.Index) error {
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(col.CollectionID, 10))
v1, err := proto.Marshal(model.ConvertToCollectionPB(col))
if err != nil {
log.Error("create index marshal fail", zap.String("key", k1), zap.Error(err))
return err
}
k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(index.IndexID, 10))
v2, err := proto.Marshal(model.ConvertToIndexPB(index))
if err != nil {
log.Error("create index marshal fail", zap.String("key", k2), zap.Error(err))
return err
}
meta := map[string]string{k1: string(v1), k2: string(v2)}
err = kc.Txn.MultiSave(meta)
if err != nil {
log.Error("create index persist meta fail", zap.String("key", k1), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) AlterIndex(ctx context.Context, oldIndex *model.Index, newIndex *model.Index) error {
kvs := make(map[string]string, len(newIndex.SegmentIndexes))
for _, segmentIndex := range newIndex.SegmentIndexes {
segment := segmentIndex.Segment
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, newIndex.CollectionID, newIndex.IndexID, segment.PartitionID, segment.SegmentID)
segIdxInfo := &pb.SegmentIndexInfo{
CollectionID: newIndex.CollectionID,
PartitionID: segment.PartitionID,
SegmentID: segment.SegmentID,
BuildID: segmentIndex.BuildID,
EnableIndex: segmentIndex.EnableIndex,
FieldID: newIndex.FieldID,
IndexID: newIndex.IndexID,
}
v, err := proto.Marshal(segIdxInfo)
if err != nil {
log.Error("alter index marshal fail", zap.String("key", k), zap.Error(err))
return err
}
kvs[k] = string(v)
}
err := kc.Txn.MultiSave(kvs)
if err != nil {
log.Error("alter index persist meta fail", zap.Any("segmentIndex", newIndex.SegmentIndexes), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collection.Aliases[0])
v, err := proto.Marshal(&pb.CollectionInfo{ID: collection.CollectionID, Schema: &schemapb.CollectionSchema{Name: collection.Aliases[0]}})
if err != nil {
log.Error("create alias marshal fail", zap.String("key", k), zap.Error(err))
return err
}
err = kc.Snapshot.Save(k, string(v), ts)
if err != nil {
log.Error("create alias persist meta fail", zap.String("key", k), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error {
k := fmt.Sprintf("%s/%s", CredentialPrefix, credential.Username)
v, err := json.Marshal(&internalpb.CredentialInfo{EncryptedPassword: credential.EncryptedPassword})
if err != nil {
log.Error("create credential marshal fail", zap.String("key", k), zap.Error(err))
return err
}
err = kc.Txn.Save(k, string(v))
if err != nil {
log.Error("create credential persist meta fail", zap.String("key", k), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) {
collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
collVal, err := kc.Snapshot.Load(collKey, ts)
if err != nil {
log.Error("get collection meta fail", zap.String("key", collKey), zap.Error(err))
return nil, err
}
collMeta := &pb.CollectionInfo{}
err = proto.Unmarshal([]byte(collVal), collMeta)
if err != nil {
log.Error("collection meta marshal fail", zap.String("key", collKey), zap.Error(err))
return nil, err
}
return model.ConvertCollectionPBToModel(collMeta, map[string]string{}), nil
}
func (kc *Catalog) CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool {
_, err := kc.GetCollectionByID(ctx, collectionID, ts)
return err == nil
}
func (kc *Catalog) GetCredential(ctx context.Context, username string) (*model.Credential, error) {
k := fmt.Sprintf("%s/%s", CredentialPrefix, username)
v, err := kc.Txn.Load(k)
if err != nil {
log.Warn("get credential meta fail", zap.String("key", k), zap.Error(err))
return nil, err
}
credentialInfo := internalpb.CredentialInfo{}
err = json.Unmarshal([]byte(v), &credentialInfo)
if err != nil {
return nil, fmt.Errorf("unmarshal credential info err:%w", err)
}
return &model.Credential{Username: username, EncryptedPassword: credentialInfo.EncryptedPassword}, nil
}
func (kc *Catalog) AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
return kc.CreateAlias(ctx, collection, ts)
}
func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
delMetakeysSnap := []string{
fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionInfo.CollectionID),
}
for _, alias := range collectionInfo.Aliases {
delMetakeysSnap = append(delMetakeysSnap,
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
)
}
err := kc.Snapshot.MultiSaveAndRemoveWithPrefix(map[string]string{}, delMetakeysSnap, ts)
if err != nil {
log.Error("drop collection update meta fail", zap.Int64("collectionID", collectionInfo.CollectionID), zap.Error(err))
return err
}
// Txn operation
kvs := map[string]string{}
for k, v := range collectionInfo.Extra {
kvs[k] = v
}
delMetaKeysTxn := []string{
fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collectionInfo.CollectionID),
fmt.Sprintf("%s/%d", IndexMetaPrefix, collectionInfo.CollectionID),
}
err = kc.Txn.MultiSaveAndRemoveWithPrefix(kvs, delMetaKeysTxn)
if err != nil {
log.Warn("drop collection update meta fail", zap.Int64("collectionID", collectionInfo.CollectionID), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error {
collMeta := model.ConvertToCollectionPB(collectionInfo)
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collectionInfo.CollectionID, 10))
v, err := proto.Marshal(collMeta)
if err != nil {
log.Error("drop partition marshal fail", zap.String("key", k), zap.Error(err))
return err
}
err = kc.Snapshot.Save(k, string(v), ts)
if err != nil {
log.Error("drop partition update collection meta fail",
zap.Int64("collectionID", collectionInfo.CollectionID),
zap.Int64("partitionID", partitionID),
zap.Error(err))
return err
}
var delMetaKeys []string
for _, idxInfo := range collMeta.FieldIndexes {
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partitionID)
delMetaKeys = append(delMetaKeys, k)
}
// Txn operation
metaTxn := map[string]string{}
for k, v := range collectionInfo.Extra {
metaTxn[k] = v
}
err = kc.Txn.MultiSaveAndRemoveWithPrefix(metaTxn, delMetaKeys)
if err != nil {
log.Warn("drop partition update meta fail",
zap.Int64("collectionID", collectionInfo.CollectionID),
zap.Int64("partitionID", partitionID),
zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error {
collMeta := model.ConvertToCollectionPB(collectionInfo)
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collectionInfo.CollectionID, 10))
v, err := proto.Marshal(collMeta)
if err != nil {
log.Error("drop index marshal fail", zap.String("key", k), zap.Error(err))
return err
}
saveMeta := map[string]string{k: string(v)}
delMeta := []string{
fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collectionInfo.CollectionID, dropIdxID),
fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collectionInfo.CollectionID, dropIdxID),
}
err = kc.Txn.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
if err != nil {
log.Error("drop partition update meta fail",
zap.Int64("collectionID", collectionInfo.CollectionID),
zap.Int64("indexID", dropIdxID),
zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) DropCredential(ctx context.Context, username string) error {
k := fmt.Sprintf("%s/%s", CredentialPrefix, username)
err := kc.Txn.Remove(k)
if err != nil {
log.Error("drop credential update meta fail", zap.String("key", k), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error {
delMetakeys := []string{
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
}
meta := make(map[string]string)
err := kc.Snapshot.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
if err != nil {
log.Error("drop alias update meta fail", zap.String("alias", alias), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) {
_, vals, err := kc.Snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
if err != nil {
log.Warn("get collection meta fail", zap.String("collectionName", collectionName), zap.Error(err))
return nil, err
}
for _, val := range vals {
colMeta := pb.CollectionInfo{}
err = proto.Unmarshal([]byte(val), &colMeta)
if err != nil {
log.Warn("get collection meta unmarshal fail", zap.String("collectionName", collectionName), zap.Error(err))
continue
}
if colMeta.Schema.Name == collectionName {
return model.ConvertCollectionPBToModel(&colMeta, map[string]string{}), nil
}
}
return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
}
func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Collection, error) {
_, vals, err := kc.Snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
if err != nil {
log.Error("get collections meta fail",
zap.String("prefix", CollectionMetaPrefix),
zap.Uint64("timestamp", ts),
zap.Error(err))
return nil, nil
}
colls := make(map[string]*model.Collection)
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err := proto.Unmarshal([]byte(val), &collMeta)
if err != nil {
log.Warn("unmarshal collection info failed", zap.Error(err))
continue
}
colls[collMeta.Schema.Name] = model.ConvertCollectionPBToModel(&collMeta, map[string]string{})
}
return colls, nil
}
func (kc *Catalog) ListAliases(ctx context.Context) ([]*model.Collection, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
if err != nil {
log.Error("get aliases meta fail", zap.String("prefix", CollectionAliasMetaPrefix), zap.Error(err))
return nil, err
}
var colls []*model.Collection
for _, value := range values {
aliasInfo := pb.CollectionInfo{}
err = proto.Unmarshal([]byte(value), &aliasInfo)
if err != nil {
log.Warn("unmarshal aliases failed", zap.Error(err))
continue
}
colls = append(colls, model.ConvertCollectionPBToModel(&aliasInfo, map[string]string{}))
}
return colls, nil
}
func (kc *Catalog) listSegmentIndexes(ctx context.Context) (map[int64]*model.Index, error) {
_, values, err := kc.Txn.LoadWithPrefix(SegmentIndexMetaPrefix)
if err != nil {
log.Error("list segment index meta fail", zap.String("prefix", SegmentIndexMetaPrefix), zap.Error(err))
return nil, err
}
indexes := make(map[int64]*model.Index, len(values))
for _, value := range values {
if bytes.Equal([]byte(value), SuffixSnapshotTombstone) {
// backward compatibility, IndexMeta used to be in SnapshotKV
continue
}
segmentIndexInfo := pb.SegmentIndexInfo{}
err = proto.Unmarshal([]byte(value), &segmentIndexInfo)
if err != nil {
log.Warn("unmarshal segment index info failed", zap.Error(err))
continue
}
newIndex := model.ConvertSegmentIndexPBToModel(&segmentIndexInfo)
oldIndex, ok := indexes[segmentIndexInfo.IndexID]
if ok {
for segID, segmentIdxInfo := range newIndex.SegmentIndexes {
oldIndex.SegmentIndexes[segID] = segmentIdxInfo
}
} else {
indexes[segmentIndexInfo.IndexID] = newIndex
}
}
return indexes, nil
}
func (kc *Catalog) listIndexMeta(ctx context.Context) (map[int64]*model.Index, error) {
_, values, err := kc.Txn.LoadWithPrefix(IndexMetaPrefix)
if err != nil {
log.Error("list index meta fail", zap.String("prefix", IndexMetaPrefix), zap.Error(err))
return nil, err
}
indexes := make(map[int64]*model.Index, len(values))
for _, value := range values {
if bytes.Equal([]byte(value), SuffixSnapshotTombstone) {
// backward compatibility, IndexMeta used to be in SnapshotKV
continue
}
meta := pb.IndexInfo{}
err = proto.Unmarshal([]byte(value), &meta)
if err != nil {
log.Warn("unmarshal index info failed", zap.Error(err))
continue
}
index := model.ConvertIndexPBToModel(&meta)
if _, ok := indexes[meta.IndexID]; ok {
log.Warn("duplicated index id exists in index meta", zap.Int64("index id", meta.IndexID))
}
indexes[meta.IndexID] = index
}
return indexes, nil
}
func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
indexMeta, err := kc.listIndexMeta(ctx)
if err != nil {
return nil, err
}
segmentIndexMeta, err := kc.listSegmentIndexes(ctx)
if err != nil {
return nil, err
}
var indexes []*model.Index
//merge index and segment index
for indexID, index := range indexMeta {
segmentIndex, ok := segmentIndexMeta[indexID]
if ok {
index = model.MergeIndexModel(index, segmentIndex)
delete(segmentIndexMeta, indexID)
}
indexes = append(indexes, index)
}
// add remain segmentIndexMeta
for _, index := range segmentIndexMeta {
indexes = append(indexes, index)
}
return indexes, nil
}
func (kc *Catalog) ListCredentials(ctx context.Context) ([]string, error) {
keys, _, err := kc.Txn.LoadWithPrefix(CredentialPrefix)
if err != nil {
log.Error("list all credential usernames fail", zap.String("prefix", CredentialPrefix), zap.Error(err))
return nil, err
}
var usernames []string
for _, path := range keys {
username := typeutil.After(path, UserSubPrefix+"/")
if len(username) == 0 {
log.Warn("no username extract from path:", zap.String("path", path))
continue
}
usernames = append(usernames, username)
}
return usernames, nil
}
func (kc *Catalog) Close() {
// do nothing
}

View File

@ -1,24 +0,0 @@
package model
import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
)
type Collection struct {
TenantID string
CollectionID int64
Partitions []*Partition
Name string
Description string
AutoID bool
Fields []*Field
FieldIndexes []*Index
VirtualChannelNames []string
PhysicalChannelNames []string
ShardsNum int32
StartPositions []*commonpb.KeyDataPair
CreateTime uint64
ConsistencyLevel commonpb.ConsistencyLevel
Aliases []string
Extra map[string]string // extra kvs
}

View File

@ -1,6 +0,0 @@
package model
type Credential struct {
Username string
EncryptedPassword string
}

View File

@ -1,17 +0,0 @@
package model
import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
type Field struct {
FieldID int64
Name string
IsPrimaryKey bool
Description string
DataType schemapb.DataType
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
AutoID bool
}

View File

@ -1,13 +0,0 @@
package model
import "github.com/milvus-io/milvus/internal/proto/commonpb"
type Index struct {
CollectionID int64
FieldID int64
IndexID int64
IndexName string
IndexParams []*commonpb.KeyValuePair
SegmentIndexes map[int64]SegmentIndex //segmentID -> segmentIndex
Extra map[string]string
}

View File

@ -1,237 +0,0 @@
package model
import (
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
func ConvertToFieldSchemaPB(field *Field) *schemapb.FieldSchema {
return &schemapb.FieldSchema{
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
AutoID: field.AutoID,
}
}
func BatchConvertToFieldSchemaPB(fields []*Field) []*schemapb.FieldSchema {
fieldSchemas := make([]*schemapb.FieldSchema, len(fields))
for idx, field := range fields {
fieldSchemas[idx] = ConvertToFieldSchemaPB(field)
}
return fieldSchemas
}
func ConvertFieldPBToModel(fieldSchema *schemapb.FieldSchema) *Field {
return &Field{
FieldID: fieldSchema.FieldID,
Name: fieldSchema.Name,
IsPrimaryKey: fieldSchema.IsPrimaryKey,
Description: fieldSchema.Description,
DataType: fieldSchema.DataType,
TypeParams: fieldSchema.TypeParams,
IndexParams: fieldSchema.IndexParams,
AutoID: fieldSchema.AutoID,
}
}
func BatchConvertFieldPBToModel(fieldSchemas []*schemapb.FieldSchema) []*Field {
fields := make([]*Field, len(fieldSchemas))
for idx, fieldSchema := range fieldSchemas {
fields[idx] = ConvertFieldPBToModel(fieldSchema)
}
return fields
}
func ConvertCollectionPBToModel(coll *pb.CollectionInfo, extra map[string]string) *Collection {
partitions := make([]*Partition, len(coll.PartitionIDs))
for idx := range coll.PartitionIDs {
partitions[idx] = &Partition{
PartitionID: coll.PartitionIDs[idx],
PartitionName: coll.PartitionNames[idx],
PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx],
}
}
indexes := make([]*Index, len(coll.FieldIndexes))
for idx, fieldIndexInfo := range coll.FieldIndexes {
indexes[idx] = &Index{
FieldID: fieldIndexInfo.FiledID,
IndexID: fieldIndexInfo.IndexID,
}
}
return &Collection{
CollectionID: coll.ID,
Name: coll.Schema.Name,
Description: coll.Schema.Description,
AutoID: coll.Schema.AutoID,
Fields: BatchConvertFieldPBToModel(coll.Schema.Fields),
Partitions: partitions,
FieldIndexes: indexes,
VirtualChannelNames: coll.VirtualChannelNames,
PhysicalChannelNames: coll.PhysicalChannelNames,
ShardsNum: coll.ShardsNum,
ConsistencyLevel: coll.ConsistencyLevel,
CreateTime: coll.CreateTime,
StartPositions: coll.StartPositions,
Extra: extra,
}
}
func CloneCollectionModel(coll Collection) *Collection {
return &Collection{
TenantID: coll.TenantID,
CollectionID: coll.CollectionID,
Name: coll.Name,
Description: coll.Description,
AutoID: coll.AutoID,
Fields: coll.Fields,
Partitions: coll.Partitions,
FieldIndexes: coll.FieldIndexes,
VirtualChannelNames: coll.VirtualChannelNames,
PhysicalChannelNames: coll.PhysicalChannelNames,
ShardsNum: coll.ShardsNum,
ConsistencyLevel: coll.ConsistencyLevel,
CreateTime: coll.CreateTime,
StartPositions: coll.StartPositions,
Aliases: coll.Aliases,
Extra: coll.Extra,
}
}
func ConvertToCollectionPB(coll *Collection) *pb.CollectionInfo {
fields := make([]*schemapb.FieldSchema, len(coll.Fields))
for idx, field := range coll.Fields {
fields[idx] = &schemapb.FieldSchema{
FieldID: field.FieldID,
Name: field.Name,
IsPrimaryKey: field.IsPrimaryKey,
Description: field.Description,
DataType: field.DataType,
TypeParams: field.TypeParams,
IndexParams: field.IndexParams,
AutoID: field.AutoID,
}
}
collSchema := &schemapb.CollectionSchema{
Name: coll.Name,
Description: coll.Description,
AutoID: coll.AutoID,
Fields: fields,
}
partitionIDs := make([]int64, len(coll.Partitions))
partitionNames := make([]string, len(coll.Partitions))
partitionCreatedTimestamps := make([]uint64, len(coll.Partitions))
for idx, partition := range coll.Partitions {
partitionIDs[idx] = partition.PartitionID
partitionNames[idx] = partition.PartitionName
partitionCreatedTimestamps[idx] = partition.PartitionCreatedTimestamp
}
fieldIndexes := make([]*pb.FieldIndexInfo, len(coll.FieldIndexes))
for idx, index := range coll.FieldIndexes {
fieldIndexes[idx] = &pb.FieldIndexInfo{
FiledID: index.FieldID,
IndexID: index.IndexID,
}
}
return &pb.CollectionInfo{
ID: coll.CollectionID,
Schema: collSchema,
PartitionIDs: partitionIDs,
PartitionNames: partitionNames,
FieldIndexes: fieldIndexes,
CreateTime: coll.CreateTime,
VirtualChannelNames: coll.VirtualChannelNames,
PhysicalChannelNames: coll.PhysicalChannelNames,
ShardsNum: coll.ShardsNum,
PartitionCreatedTimestamps: partitionCreatedTimestamps,
ConsistencyLevel: coll.ConsistencyLevel,
StartPositions: coll.StartPositions,
}
}
func MergeIndexModel(a *Index, b *Index) *Index {
if b.SegmentIndexes != nil {
if a.SegmentIndexes == nil {
a.SegmentIndexes = b.SegmentIndexes
} else {
for segID, segmentIndex := range b.SegmentIndexes {
a.SegmentIndexes[segID] = segmentIndex
}
}
}
if a.CollectionID == 0 && b.CollectionID != 0 {
a.CollectionID = b.CollectionID
}
if a.FieldID == 0 && b.FieldID != 0 {
a.FieldID = b.FieldID
}
if a.IndexID == 0 && b.IndexID != 0 {
a.IndexID = b.IndexID
}
if a.IndexName == "" && b.IndexName != "" {
a.IndexName = b.IndexName
}
if a.IndexParams == nil && b.IndexParams != nil {
a.IndexParams = b.IndexParams
}
if a.Extra == nil && b.Extra != nil {
a.Extra = b.Extra
}
return a
}
func ConvertSegmentIndexPBToModel(segIndex *pb.SegmentIndexInfo) *Index {
return &Index{
CollectionID: segIndex.CollectionID,
SegmentIndexes: map[int64]SegmentIndex{
segIndex.SegmentID: {
Segment: Segment{
SegmentID: segIndex.SegmentID,
PartitionID: segIndex.PartitionID,
},
BuildID: segIndex.BuildID,
EnableIndex: segIndex.EnableIndex,
},
},
FieldID: segIndex.FieldID,
IndexID: segIndex.IndexID,
}
}
func ConvertIndexPBToModel(indexInfo *pb.IndexInfo) *Index {
return &Index{
IndexName: indexInfo.IndexName,
IndexID: indexInfo.IndexID,
IndexParams: indexInfo.IndexParams,
}
}
func ConvertToIndexPB(index *Index) *pb.IndexInfo {
return &pb.IndexInfo{
IndexName: index.IndexName,
IndexID: index.IndexID,
IndexParams: index.IndexParams,
}
}
func ConvertToCredentialPB(cred *Credential) *internalpb.CredentialInfo {
if cred == nil {
return nil
}
return &internalpb.CredentialInfo{
Username: cred.Username,
EncryptedPassword: cred.EncryptedPassword,
}
}

View File

@ -1,8 +0,0 @@
package model
type Partition struct {
PartitionID int64
PartitionName string
PartitionCreatedTimestamp uint64
Extra map[string]string
}

View File

@ -1,25 +0,0 @@
package model
import "github.com/milvus-io/milvus/internal/proto/commonpb"
type Segment struct {
SegmentID int64
PartitionID int64
NumRows int64
MemSize int64
DmChannel string
CompactionFrom []int64
CreatedByCompaction bool
SegmentState commonpb.SegmentState
IndexInfos []*SegmentIndex
ReplicaIds []int64
NodeIds []int64
}
type SegmentIndex struct {
Segment
EnableIndex bool
BuildID int64
IndexSize uint64
IndexFilePaths []string
}

View File

@ -1,91 +0,0 @@
package table
import (
"context"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Catalog struct {
}
func (tc *Catalog) CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) {
return nil, nil
}
func (tc *Catalog) GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) {
return nil, nil
}
func (tc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Collection, error) {
return nil, nil
}
func (tc *Catalog) CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool {
return false
}
func (tc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) CreatePartition(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) DropPartition(ctx context.Context, collectionInfo *model.Collection, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error {
return nil
}
func (tc *Catalog) DropIndex(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
return nil, nil
}
func (tc *Catalog) CreateAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) DropAlias(ctx context.Context, collectionID typeutil.UniqueID, alias string, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) AlterAlias(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
return nil
}
func (tc *Catalog) ListAliases(ctx context.Context) ([]*model.Collection, error) {
return nil, nil
}
func (tc *Catalog) GetCredential(ctx context.Context, username string) (*model.Credential, error) {
return nil, nil
}
func (tc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error {
return nil
}
func (tc *Catalog) DropCredential(ctx context.Context, username string) error {
return nil
}
func (tc *Catalog) ListCredentials(ctx context.Context) ([]string, error) {
return nil, nil
}
func (tc *Catalog) Close() {
}

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"context"
@ -40,7 +40,7 @@ type rtPair struct {
ts typeutil.Timestamp
}
type MetaSnapshot struct {
type metaSnapshot struct {
cli *clientv3.Client
root string
tsKey string
@ -52,11 +52,11 @@ type MetaSnapshot struct {
numTs int
}
func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*MetaSnapshot, error) {
func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*metaSnapshot, error) {
if bufSize <= 0 {
bufSize = 1024
}
ms := &MetaSnapshot{
ms := &metaSnapshot{
cli: cli,
root: root,
tsKey: tsKey,
@ -72,7 +72,7 @@ func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*Me
return ms, nil
}
func (ms *MetaSnapshot) loadTs() error {
func (ms *metaSnapshot) loadTs() error {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
@ -115,12 +115,12 @@ func (ms *MetaSnapshot) loadTs() error {
return nil
}
if curVer == version {
log.Debug("Snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version))
log.Debug("snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version))
}
strTs := string(resp.Kvs[0].Value)
if strTs == "0" {
//#issue 7150, index building inserted "0", skipping
//this is a special fix for backward compatibility, the previous version will put 0 ts into the Snapshot building index
//this is a special fix for backward compatibility, the previous version will put 0 ts into the snapshot building index
continue
}
curTs, err := strconv.ParseUint(strTs, 10, 64)
@ -139,16 +139,16 @@ func (ms *MetaSnapshot) loadTs() error {
return nil
}
func (ms *MetaSnapshot) maxTs() typeutil.Timestamp {
func (ms *metaSnapshot) maxTs() typeutil.Timestamp {
return ms.ts2Rev[ms.maxPos].ts
}
func (ms *MetaSnapshot) minTs() typeutil.Timestamp {
func (ms *metaSnapshot) minTs() typeutil.Timestamp {
return ms.ts2Rev[ms.minPos].ts
}
func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
log.Debug("init meta Snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
func (ms *metaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
log.Debug("init meta snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
if ms.numTs == 0 {
ms.maxPos = len(ms.ts2Rev) - 1
ms.minPos = len(ms.ts2Rev) - 1
@ -163,7 +163,7 @@ func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
}
}
func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
func (ms *metaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
log.Debug("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
ms.maxPos++
if ms.maxPos == len(ms.ts2Rev) {
@ -182,7 +182,7 @@ func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
}
}
func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 {
func (ms *metaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 {
if length == 1 {
return ms.ts2Rev[start].rev
}
@ -208,7 +208,7 @@ func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int)
}
}
func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
func (ms *metaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
if ms.numTs == 0 {
return 0
}
@ -236,7 +236,7 @@ func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
return 0
}
func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
func (ms *metaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
if rev < 2 {
return 0
}
@ -265,7 +265,7 @@ func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
return 0
}
func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
func (ms *metaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
rev := ms.getRevOnCache(ts)
if rev > 0 {
return rev, nil
@ -278,7 +278,7 @@ func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
return 0, fmt.Errorf("can't find revision on ts=%d", ts)
}
func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
func (ms *metaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -297,7 +297,7 @@ func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
return nil
}
func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -327,7 +327,7 @@ func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error)
return string(resp.Kvs[0].Value), nil
}
func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
func (ms *metaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -348,7 +348,7 @@ func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp)
return nil
}
func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
@ -385,7 +385,7 @@ func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]str
return keys, values, nil
}
func (ms *MetaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)

View File

@ -14,32 +14,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"context"
"fmt"
"math/rand"
"os"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
Params.Init()
code := m.Run()
os.Exit(code)
}
func TestMetaSnapshot(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
@ -57,7 +46,7 @@ func TestMetaSnapshot(t *testing.T) {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -71,13 +60,13 @@ func TestMetaSnapshot(t *testing.T) {
assert.Nil(t, err)
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
}
func TestSearchOnCache(t *testing.T) {
ms := &MetaSnapshot{}
ms := &metaSnapshot{}
for i := 0; i < 8; i++ {
ms.ts2Rev = append(ms.ts2Rev,
rtPair{
@ -98,7 +87,7 @@ func TestSearchOnCache(t *testing.T) {
}
func TestGetRevOnCache(t *testing.T) {
ms := &MetaSnapshot{}
ms := &metaSnapshot{}
ms.ts2Rev = make([]rtPair, 7)
ms.initTs(7, 16)
ms.initTs(6, 14)
@ -192,7 +181,7 @@ func TestGetRevOnEtcd(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
ms := MetaSnapshot{
ms := metaSnapshot{
cli: etcdCli,
root: rootPath,
tsKey: tsKey,
@ -241,7 +230,7 @@ func TestLoad(t *testing.T) {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -261,7 +250,7 @@ func TestLoad(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -289,7 +278,7 @@ func TestMultiSave(t *testing.T) {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -320,7 +309,7 @@ func TestMultiSave(t *testing.T) {
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -354,7 +343,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
}
defer etcdCli.Close()
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -392,7 +381,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.Equal(t, 39-i, len(vals))
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
@ -426,7 +415,7 @@ func TestTsBackward(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
kv, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
@ -436,7 +425,7 @@ func TestTsBackward(t *testing.T) {
kv.Save("a", "c", 99) // backward
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
kv, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Error(t, err)
}
@ -453,7 +442,7 @@ func TestFix7150(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
kv, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
@ -463,7 +452,7 @@ func TestFix7150(t *testing.T) {
kv.Save("a", "c", 0) // bug introduced
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
kv, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -24,7 +24,6 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -75,7 +74,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
// WatchProxy starts a goroutine to watch proxy session changes on etcd
func (p *proxyManager) WatchProxy() error {
ctx, cancel := context.WithTimeout(p.ctx, kv.RequestTimeout)
ctx, cancel := context.WithTimeout(p.ctx, RequestTimeout)
defer cancel()
sessions, rev, err := p.getSessionsOnEtcd(ctx)
@ -198,7 +197,7 @@ func (p *proxyManager) Stop() {
// listProxyInEtcd helper function lists proxy in etcd
func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sessionutil.Session, error) {
ctx2, cancel := context.WithTimeout(ctx, kv.RequestTimeout)
ctx2, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
resp, err := cli.Get(
ctx2,

View File

@ -31,24 +31,26 @@ import (
"time"
"github.com/golang/protobuf/proto"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/metrics"
ms "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util"
@ -62,8 +64,6 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
// UniqueID is an alias of typeutil.UniqueID.
@ -133,7 +133,7 @@ type Core struct {
CallGetRecoveryInfoService func(ctx context.Context, collID, partID UniqueID) ([]*datapb.SegmentBinlogs, error)
//call index builder's client to build index, return build id or get index state.
CallBuildIndexService func(ctx context.Context, segID UniqueID, binlog []string, field *model.Field, idxInfo *model.Index, numRows int64) (typeutil.UniqueID, error)
CallBuildIndexService func(ctx context.Context, segID UniqueID, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
CallGetIndexStatesService func(ctx context.Context, IndexBuildIDs []int64) ([]*indexpb.IndexInfo, error)
@ -363,20 +363,20 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
if len(collMeta.FieldIndexes) == 0 {
continue
}
for _, part := range collMeta.Partitions {
for _, partID := range collMeta.PartitionIDs {
ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Minute)
segBinlogs, err := c.CallGetRecoveryInfoService(ctx2, collMeta.CollectionID, part.PartitionID)
segBinlogs, err := c.CallGetRecoveryInfoService(ctx2, collMeta.ID, partID)
if err != nil {
log.Debug("failed to get flushed segments from dataCoord",
zap.Int64("collection ID", collMeta.CollectionID),
zap.Int64("partition ID", part.PartitionID),
zap.Int64("collection ID", collMeta.GetID()),
zap.Int64("partition ID", partID),
zap.Error(err))
cancel2()
continue
}
for _, segBinlog := range segBinlogs {
segID := segBinlog.SegmentID
var indexInfos []*model.Index
var indexInfos []*etcdpb.FieldIndexInfo
indexMeta, ok := segID2IndexMeta[segID]
if !ok {
indexInfos = append(indexInfos, collMeta.FieldIndexes...)
@ -389,11 +389,11 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
}
for _, idxInfo := range indexInfos {
/* #nosec G601 */
field, err := GetFieldSchemaByID(&collMeta, idxInfo.FieldID)
field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID)
if err != nil {
log.Debug("GetFieldSchemaByID",
zap.Any("collection_meta", collMeta),
zap.Int64("field id", idxInfo.FieldID))
zap.Int64("field id", idxInfo.FiledID))
continue
}
indexMeta, ok := indexID2Meta[idxInfo.IndexID]
@ -401,26 +401,19 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID))
continue
}
info := model.Index{
CollectionID: collMeta.CollectionID,
FieldID: idxInfo.FieldID,
info := etcdpb.SegmentIndexInfo{
CollectionID: collMeta.ID,
PartitionID: partID,
SegmentID: segID,
FieldID: idxInfo.FiledID,
IndexID: idxInfo.IndexID,
SegmentIndexes: map[int64]model.SegmentIndex{
segID: {
Segment: model.Segment{
SegmentID: segID,
PartitionID: part.PartitionID,
},
EnableIndex: false,
},
},
EnableIndex: false,
}
log.Debug("building index by background checker",
zap.Int64("segment_id", segID),
zap.Int64("index_id", indexMeta.IndexID),
zap.Int64("collection_id", collMeta.CollectionID))
segmentIndex := info.SegmentIndexes[segID]
segmentIndex.BuildID, err = c.BuildIndex(ctx2, segID, segBinlog.GetNumOfRows(), segBinlog.GetFieldBinlogs(), field, &indexMeta, false)
zap.Int64("collection_id", collMeta.ID))
info.BuildID, err = c.BuildIndex(ctx2, segID, segBinlog.GetNumOfRows(), segBinlog.GetFieldBinlogs(), field, &indexMeta, false)
if err != nil {
log.Debug("build index failed",
zap.Int64("segment_id", segID),
@ -428,14 +421,14 @@ func (c *Core) checkFlushedSegments(ctx context.Context) {
zap.Int64("index_id", indexMeta.IndexID))
continue
}
if segmentIndex.BuildID != 0 {
segmentIndex.EnableIndex = true
if info.BuildID != 0 {
info.EnableIndex = true
}
if err := c.MetaTable.AlterIndex(&info); err != nil {
if err := c.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed",
zap.Int64("collection_id", collMeta.CollectionID),
zap.Int64("collection_id", collMeta.ID),
zap.Int64("index_id", info.IndexID),
zap.Int64("build_id", segmentIndex.BuildID),
zap.Int64("build_id", info.BuildID),
zap.Error(err))
}
}
@ -452,16 +445,16 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[U
}
segID2PartID := make(map[UniqueID]UniqueID)
segID2Binlog := make(map[UniqueID]*datapb.SegmentBinlogs)
for _, part := range collMeta.Partitions {
if segs, err := c.CallGetRecoveryInfoService(ctx, collID, part.PartitionID); err == nil {
for _, partID := range collMeta.PartitionIDs {
if segs, err := c.CallGetRecoveryInfoService(ctx, collID, partID); err == nil {
for _, s := range segs {
segID2PartID[s.SegmentID] = part.PartitionID
segID2PartID[s.SegmentID] = partID
segID2Binlog[s.SegmentID] = s
}
} else {
log.Error("failed to get flushed segments info from dataCoord",
zap.Int64("collection ID", collID),
zap.Int64("partition ID", part.PartitionID),
zap.Int64("partition ID", partID),
zap.Error(err))
return nil, nil, err
}
@ -711,7 +704,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
}
}()
c.CallBuildIndexService = func(ctx context.Context, segID UniqueID, binlog []string, field *model.Field, idxInfo *model.Index, numRows int64) (retID typeutil.UniqueID, retErr error) {
c.CallBuildIndexService = func(ctx context.Context, segID UniqueID, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (retID typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("build index panic, msg = %v", err)
@ -725,7 +718,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
IndexID: idxInfo.IndexID,
IndexName: idxInfo.IndexName,
NumRows: numRows,
FieldSchema: model.ConvertToFieldSchemaPB(field),
FieldSchema: field,
SegmentID: segID,
})
if err != nil {
@ -871,14 +864,14 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
}
// BuildIndex will check row num and call build index service
func (c *Core) BuildIndex(ctx context.Context, segID UniqueID, numRows int64, binlogs []*datapb.FieldBinlog, field *model.Field, idxInfo *model.Index, isFlush bool) (typeutil.UniqueID, error) {
func (c *Core) BuildIndex(ctx context.Context, segID UniqueID, numRows int64, binlogs []*datapb.FieldBinlog, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) (typeutil.UniqueID, error) {
log.Debug("start build index", zap.String("index name", idxInfo.IndexName),
zap.String("field name", field.Name), zap.Int64("segment id", segID))
sp, ctx := trace.StartSpanFromContext(ctx)
defer sp.Finish()
if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) {
info, err := c.MetaTable.GetSegmentIndexInfoByID(segID, field.FieldID, idxInfo.IndexName)
return info.SegmentIndexes[segID].BuildID, err
info, err := c.MetaTable.GetSegmentIndexInfoByID(segID, field.FieldID, idxInfo.GetIndexName())
return info.BuildID, err
}
var bldID UniqueID
var err error
@ -887,7 +880,7 @@ func (c *Core) BuildIndex(ctx context.Context, segID UniqueID, numRows int64, bi
} else {
binLogs := make([]string, 0)
for _, fieldBinLog := range binlogs {
if fieldBinLog.GetFieldID() == field.FieldID {
if fieldBinLog.GetFieldID() == field.GetFieldID() {
for _, binLog := range fieldBinLog.GetBinlogs() {
binLogs = append(binLogs, binLog.LogPath)
}
@ -1021,13 +1014,12 @@ func (c *Core) Init() error {
log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError))
return initError
}
var ss *kvmetestore.SuffixSnapshot
if ss, initError = kvmetestore.NewSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil {
var ss *suffixSnapshot
if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil {
log.Error("RootCoord failed to new suffixSnapshot", zap.Error(initError))
return initError
}
if c.MetaTable, initError = NewMetaTable(c.ctx, metaKV, ss); initError != nil {
if c.MetaTable, initError = NewMetaTable(metaKV, ss); initError != nil {
log.Error("RootCoord failed to new MetaTable", zap.Any("reason", initError))
return initError
}
@ -1205,7 +1197,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
if err != nil {
return err
}
if _, err = c.MetaTable.GetPartitionByName(collInfo.CollectionID, ddReq.PartitionName, 0); err != nil {
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil {
if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
@ -1226,7 +1218,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
if err != nil {
return err
}
if _, err = c.MetaTable.GetPartitionByName(collInfo.CollectionID, ddReq.PartitionName, 0); err == nil {
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil {
if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
@ -2060,14 +2052,14 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
if len(coll.FieldIndexes) == 0 {
log.Debug("no index params on collection", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Name), zap.Int64("msgID", in.Base.MsgID))
zap.String("collection_name", coll.Schema.Name), zap.Int64("msgID", in.Base.MsgID))
}
for _, f := range coll.FieldIndexes {
fieldSch, err := GetFieldSchemaByID(coll, f.FieldID)
fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
if err != nil {
log.Warn("field schema not found", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Name), zap.Int64("field id", f.FieldID),
zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
continue
}
@ -2075,41 +2067,33 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID)
if err != nil {
log.Warn("index not found", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Name), zap.Int64("field id", f.FieldID),
zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
zap.Int64("index id", f.IndexID), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
continue
}
info := model.Index{
info := etcdpb.SegmentIndexInfo{
CollectionID: in.Segment.CollectionID,
PartitionID: in.Segment.PartitionID,
SegmentID: segID,
FieldID: fieldSch.FieldID,
IndexID: idxInfo.IndexID,
SegmentIndexes: map[int64]model.SegmentIndex{
segID: {
Segment: model.Segment{
SegmentID: segID,
PartitionID: in.Segment.PartitionID,
},
EnableIndex: false,
},
},
EnableIndex: false,
}
segmentIndex := info.SegmentIndexes[segID]
segmentIndex.BuildID, err = c.BuildIndex(ctx, segID, in.Segment.GetNumOfRows(), in.Segment.GetBinlogs(), fieldSch, idxInfo, true)
if err == nil && segmentIndex.BuildID != 0 {
segmentIndex.EnableIndex = true
info.BuildID, err = c.BuildIndex(ctx, segID, in.Segment.GetNumOfRows(), in.Segment.GetBinlogs(), fieldSch, idxInfo, true)
if err == nil && info.BuildID != 0 {
info.EnableIndex = true
} else {
log.Error("BuildIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Name), zap.Int64("field id", f.FieldID),
zap.Int64("index id", f.IndexID), zap.Int64("build id", segmentIndex.BuildID),
zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
zap.Int64("index id", f.IndexID), zap.Int64("build id", info.BuildID),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
continue
}
err = c.MetaTable.AlterIndex(&info)
err = c.MetaTable.AddIndex(&info)
if err != nil {
log.Error("AlterIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Name), zap.Int64("field id", f.FieldID),
log.Error("AddIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
zap.Int64("index id", f.IndexID), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
continue
}
@ -2382,7 +2366,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
// Look up collection name on collection ID.
var colName string
var colMeta *model.Collection
var colMeta *etcdpb.CollectionInfo
if colMeta, err = c.MetaTable.GetCollectionByID(ti.GetCollectionId(), 0); err != nil {
log.Error("failed to get collection name",
zap.Int64("collection ID", ti.GetCollectionId()),
@ -2392,7 +2376,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
Reason: "failed to get collection name for collection ID" + strconv.FormatInt(ti.GetCollectionId(), 10),
}, nil
}
colName = colMeta.Name
colName = colMeta.GetSchema().GetName()
// When DataNode has done its thing, remove it from the busy node list. And send import task again
resendTaskFunc()
@ -2506,7 +2490,7 @@ func (c *Core) postImportPersistLoop(ctx context.Context, taskID int64, colID in
if colMeta, err := c.MetaTable.GetCollectionByID(colID, 0); err != nil {
log.Error("failed to find meta for collection",
zap.Int64("collection ID", colID))
} else if len(colMeta.FieldIndexes) != 0 {
} else if len(colMeta.GetFieldIndexes()) != 0 {
c.wg.Add(1)
c.checkCompleteIndexLoop(ctx, taskID, colID, colName, segIDs)
}

View File

@ -28,16 +28,19 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -54,9 +57,6 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
@ -450,7 +450,7 @@ func getNotTtMsg(ctx context.Context, n int, ch <-chan *msgstream.MsgPack) []msg
}
}
func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32, modifyFunc func(collection *model.Collection)) error {
func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32, modifyFunc func(*etcdpb.CollectionInfo)) error {
schema := schemapb.CollectionSchema{
Name: collName,
}
@ -511,29 +511,24 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
chanNames[i] = funcutil.ToPhysicalChannel(vchanNames[i])
}
collInfo := model.Collection{
CollectionID: collID,
Name: schema.Name,
Description: schema.Description,
AutoID: schema.AutoID,
Fields: model.BatchConvertFieldPBToModel(schema.Fields),
FieldIndexes: make([]*model.Index, 0, 16),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
ShardsNum: 0, // intend to set zero
Partitions: []*model.Partition{
{
PartitionID: partID,
PartitionName: Params.CommonCfg.DefaultPartitionName,
PartitionCreatedTimestamp: 0,
},
},
collInfo := etcdpb.CollectionInfo{
ID: collID,
Schema: &schema,
PartitionIDs: []typeutil.UniqueID{partID},
PartitionNames: []string{Params.CommonCfg.DefaultPartitionName},
FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
ShardsNum: 0, // intend to set zero
PartitionCreatedTimestamps: []uint64{0},
}
if modifyFunc != nil {
modifyFunc(&collInfo)
}
idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
schemaBytes, err := proto.Marshal(&schema)
@ -578,7 +573,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
// clear ddl timetick in all conditions
defer core.chanTimeTick.removeDdlTimeTick(ts, reason)
err = core.MetaTable.AddCollection(&collInfo, ts, ddOpStr)
err = core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
return fmt.Errorf("meta table add collection failed,error = %w", err)
}
@ -755,21 +750,22 @@ func TestRootCoordInitData(t *testing.T) {
err = core.MetaTable.DeleteCredential(util.UserRoot)
assert.NoError(t, err)
snapshotKV, err := kvmetestore.NewMetaSnapshot(etcdCli, Params.EtcdCfg.MetaRootPath, TimestampPrefix, 7)
snapshotKV, err := newMetaSnapshot(etcdCli, Params.EtcdCfg.MetaRootPath, TimestampPrefix, 7)
assert.NotNil(t, snapshotKV)
assert.NoError(t, err)
txnKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
mt, err := NewMetaTable(context.TODO(), txnKV, snapshotKV)
mt, err := NewMetaTable(txnKV, snapshotKV)
assert.NoError(t, err)
mockTxnKV := &mockTestTxnKV{
TxnKV: mt.txn,
save: func(key, value string) error {
return fmt.Errorf("save error")
},
TxnKV: mt.txn,
save: func(key, value string) error { return txnKV.Save(key, value) },
remove: func(key string) error { return txnKV.Remove(key) },
}
//mt.txn = mockTxnKV
mt.catalog = &kvmetestore.Catalog{Txn: mockTxnKV, Snapshot: snapshotKV}
mt.txn = mockTxnKV
// mock save data error
mockTxnKV.save = func(key, value string) error {
return fmt.Errorf("save error")
}
core.MetaTable = mt
err = core.initData()
assert.Error(t, err)
@ -935,19 +931,20 @@ func TestRootCoord_Base(t *testing.T) {
dmlStream.Start()
pChanMap := core.MetaTable.ListCollectionPhysicalChannels()
assert.Greater(t, len(pChanMap[createMeta.CollectionID]), 0)
assert.Greater(t, len(pChanMap[createMeta.ID]), 0)
vChanMap := core.MetaTable.ListCollectionVirtualChannels()
assert.Greater(t, len(vChanMap[createMeta.CollectionID]), 0)
assert.Greater(t, len(vChanMap[createMeta.ID]), 0)
// get CreateCollectionMsg
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())
assert.Equal(t, 1, len(msgs))
createMsg, ok := (msgs[0]).(*msgstream.CreateCollectionMsg)
assert.True(t, ok)
assert.Equal(t, createMeta.CollectionID, createMsg.CollectionID)
assert.Equal(t, 1, len(createMeta.Partitions))
assert.Equal(t, createMeta.Partitions[0].PartitionID, createMsg.PartitionID)
assert.Equal(t, createMeta.Partitions[0].PartitionName, createMsg.PartitionName)
assert.Equal(t, createMeta.ID, createMsg.CollectionID)
assert.Equal(t, 1, len(createMeta.PartitionIDs))
assert.Equal(t, createMeta.PartitionIDs[0], createMsg.PartitionID)
assert.Equal(t, 1, len(createMeta.PartitionNames))
assert.Equal(t, createMeta.PartitionNames[0], createMsg.PartitionName)
assert.Equal(t, shardsNum, int32(len(createMeta.VirtualChannelNames)))
assert.Equal(t, shardsNum, int32(len(createMeta.PhysicalChannelNames)))
assert.Equal(t, shardsNum, createMeta.ShardsNum)
@ -987,8 +984,8 @@ func TestRootCoord_Base(t *testing.T) {
var ddCollReq = internalpb.CreateCollectionRequest{}
err = proto.Unmarshal(ddOp.Body, &ddCollReq)
assert.NoError(t, err)
assert.Equal(t, createMeta.CollectionID, ddCollReq.CollectionID)
assert.Equal(t, createMeta.Partitions[0].PartitionID, ddCollReq.PartitionID)
assert.Equal(t, createMeta.ID, ddCollReq.CollectionID)
assert.Equal(t, createMeta.PartitionIDs[0], ddCollReq.PartitionID)
// check invalid operation
req.Base.MsgID = 101
@ -1090,7 +1087,7 @@ func TestRootCoord_Base(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, collName, rsp.Schema.Name)
assert.Equal(t, collMeta.CollectionID, rsp.CollectionID)
assert.Equal(t, collMeta.ID, rsp.CollectionID)
assert.Equal(t, shardsNum, int32(len(rsp.VirtualChannelNames)))
assert.Equal(t, shardsNum, int32(len(rsp.PhysicalChannelNames)))
assert.Equal(t, shardsNum, rsp.ShardsNum)
@ -1136,8 +1133,8 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(collMeta.Partitions))
partNameIdx1, err := core.MetaTable.GetPartitionNameByID(collMeta.CollectionID, collMeta.Partitions[1].PartitionID, 0)
assert.Equal(t, 2, len(collMeta.PartitionIDs))
partNameIdx1, err := core.MetaTable.GetPartitionNameByID(collMeta.ID, collMeta.PartitionIDs[1], 0)
assert.NoError(t, err)
assert.Equal(t, partName, partNameIdx1)
@ -1145,11 +1142,11 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, 1, len(msgs))
partMsg, ok := (msgs[0]).(*msgstream.CreatePartitionMsg)
assert.True(t, ok)
assert.Equal(t, collMeta.CollectionID, partMsg.CollectionID)
assert.Equal(t, collMeta.Partitions[1].PartitionID, partMsg.PartitionID)
assert.Equal(t, collMeta.ID, partMsg.CollectionID)
assert.Equal(t, collMeta.PartitionIDs[1], partMsg.PartitionID)
assert.Equal(t, 1, len(pnm.GetCollIDs()))
assert.Equal(t, collMeta.CollectionID, pnm.GetCollIDs()[0])
assert.Equal(t, collMeta.ID, pnm.GetCollIDs()[0])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
@ -1165,8 +1162,8 @@ func TestRootCoord_Base(t *testing.T) {
var ddReq = internalpb.CreatePartitionRequest{}
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.NoError(t, err)
assert.Equal(t, collMeta.CollectionID, ddReq.CollectionID)
assert.Equal(t, collMeta.Partitions[1].PartitionID, ddReq.PartitionID)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID)
err = core.reSendDdMsg(core.ctx, true)
assert.NoError(t, err)
@ -1206,7 +1203,7 @@ func TestRootCoord_Base(t *testing.T) {
},
DbName: dbName,
CollectionName: collName,
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
}
rsp, err := core.ShowPartitions(ctx, req)
assert.NoError(t, err)
@ -1220,7 +1217,7 @@ func TestRootCoord_Base(t *testing.T) {
defer wg.Done()
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
partID := coll.Partitions[1].PartitionID
partID := coll.PartitionIDs[1]
dm.mu.Lock()
dm.segs = []typeutil.UniqueID{1000, 1001, 1002, 1003, 1004, 1005}
dm.mu.Unlock()
@ -1232,7 +1229,7 @@ func TestRootCoord_Base(t *testing.T) {
Timestamp: 170,
SourceID: 170,
},
CollectionID: coll.CollectionID,
CollectionID: coll.GetID(),
PartitionID: partID,
}
rsp, err := core.ShowSegments(ctx, req)
@ -1310,7 +1307,7 @@ func TestRootCoord_Base(t *testing.T) {
Timestamp: 190,
SourceID: 190,
},
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
SegmentID: 1000,
}
rsp, err := core.DescribeSegment(ctx, req)
@ -1370,20 +1367,20 @@ func TestRootCoord_Base(t *testing.T) {
assert.NoError(t, err)
// Normal case.
count, err := core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, ""),
collName, coll.CollectionID, []UniqueID{1000, 1001, 1002})
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.NoError(t, err)
assert.Equal(t, 3, count)
// Case with an empty result.
count, err = core.CountCompleteIndex(ctx, collName, coll.CollectionID, []UniqueID{})
count, err = core.CountCompleteIndex(ctx, collName, coll.ID, []UniqueID{})
assert.NoError(t, err)
assert.Equal(t, 0, count)
// Case where GetIndexStates failed with error.
_, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnError),
collName, coll.CollectionID, []UniqueID{1000, 1001, 1002})
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.Error(t, err)
// Case where GetIndexStates failed with bad status.
_, err = core.CountCompleteIndex(context.WithValue(ctx, ctxKey{}, returnUnsuccessfulStatus),
collName, coll.CollectionID, []UniqueID{1000, 1001, 1002})
collName, coll.ID, []UniqueID{1000, 1001, 1002})
assert.Error(t, err)
})
@ -1392,7 +1389,7 @@ func TestRootCoord_Base(t *testing.T) {
defer wg.Done()
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
partID := coll.Partitions[1].PartitionID
partID := coll.PartitionIDs[1]
flushMsg := datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
@ -1400,7 +1397,7 @@ func TestRootCoord_Base(t *testing.T) {
},
Segment: &datapb.SegmentInfo{
ID: segID,
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
PartitionID: partID,
},
}
@ -1443,7 +1440,7 @@ func TestRootCoord_Base(t *testing.T) {
}
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
core.MetaTable.collName2ID[collName] = coll.CollectionID
core.MetaTable.collName2ID[collName] = coll.GetID()
rsp, err := core.Import(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
@ -1499,7 +1496,7 @@ func TestRootCoord_Base(t *testing.T) {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
},
CollectionID: coll.CollectionID,
CollectionID: coll.ID,
SegmentID: segmentID,
}
segDesc, err := core.DescribeSegment(ctx, describeSegmentRequest)
@ -1531,15 +1528,10 @@ func TestRootCoord_Base(t *testing.T) {
return tID, 0, nil
}
core.MetaTable.collName2ID["new"+collName] = 123
core.MetaTable.collID2Meta[123] = model.Collection{
CollectionID: 123,
Partitions: []*model.Partition{
{
PartitionID: 456,
PartitionName: "testPartition",
},
},
}
core.MetaTable.collID2Meta[123] = etcdpb.CollectionInfo{
ID: 123,
PartitionIDs: []int64{456},
PartitionNames: []string{"testPartition"}}
req := &milvuspb.ImportRequest{
CollectionName: "new" + collName,
PartitionName: partName,
@ -1722,14 +1714,14 @@ func TestRootCoord_Base(t *testing.T) {
}
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
dropPartID := collMeta.Partitions[1].PartitionID
dropPartID := collMeta.PartitionIDs[1]
status, err := core.DropPartition(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(collMeta.Partitions))
partName, err := core.MetaTable.GetPartitionNameByID(collMeta.CollectionID, collMeta.Partitions[0].PartitionID, 0)
assert.Equal(t, 1, len(collMeta.PartitionIDs))
partName, err := core.MetaTable.GetPartitionNameByID(collMeta.ID, collMeta.PartitionIDs[0], 0)
assert.NoError(t, err)
assert.Equal(t, Params.CommonCfg.DefaultPartitionName, partName)
@ -1737,11 +1729,11 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, 1, len(msgs))
dmsg, ok := (msgs[0]).(*msgstream.DropPartitionMsg)
assert.True(t, ok)
assert.Equal(t, collMeta.CollectionID, dmsg.CollectionID)
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
assert.Equal(t, dropPartID, dmsg.PartitionID)
assert.Equal(t, 2, len(pnm.GetCollIDs()))
assert.Equal(t, collMeta.CollectionID, pnm.GetCollIDs()[1])
assert.Equal(t, collMeta.ID, pnm.GetCollIDs()[1])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
@ -1757,7 +1749,7 @@ func TestRootCoord_Base(t *testing.T) {
var ddReq = internalpb.DropPartitionRequest{}
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.NoError(t, err)
assert.Equal(t, collMeta.CollectionID, ddReq.CollectionID)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, dropPartID, ddReq.PartitionID)
err = core.reSendDdMsg(core.ctx, true)
@ -1775,7 +1767,7 @@ func TestRootCoord_Base(t *testing.T) {
MsgType: commonpb.MsgType_RemoveQueryChannels,
SourceID: core.session.ServerID,
},
CollectionID: collMeta.CollectionID,
CollectionID: collMeta.ID,
}
status, err := core.ReleaseDQLMessageStream(core.ctx, req)
assert.NoError(t, err)
@ -1808,15 +1800,15 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, 1, len(msgs))
dmsg, ok := (msgs[0]).(*msgstream.DropCollectionMsg)
assert.True(t, ok)
assert.Equal(t, collMeta.CollectionID, dmsg.CollectionID)
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
collIDs := pnm.GetCollIDs()
assert.Equal(t, 3, len(collIDs))
assert.Equal(t, collMeta.CollectionID, collIDs[2])
assert.Equal(t, collMeta.ID, collIDs[2])
time.Sleep(100 * time.Millisecond)
qm.mutex.Lock()
assert.Equal(t, 1, len(qm.collID))
assert.Equal(t, collMeta.CollectionID, qm.collID[0])
assert.Equal(t, collMeta.ID, qm.collID[0])
qm.mutex.Unlock()
req = &milvuspb.DropCollectionRequest{
@ -1835,7 +1827,7 @@ func TestRootCoord_Base(t *testing.T) {
time.Sleep(100 * time.Millisecond)
collIDs = pnm.GetCollIDs()
assert.Equal(t, 3, len(collIDs))
assert.Equal(t, collMeta.CollectionID, collIDs[2])
assert.Equal(t, collMeta.ID, collIDs[2])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
@ -1851,7 +1843,7 @@ func TestRootCoord_Base(t *testing.T) {
var ddReq = internalpb.DropCollectionRequest{}
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.NoError(t, err)
assert.Equal(t, collMeta.CollectionID, ddReq.CollectionID)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
err = core.reSendDdMsg(core.ctx, true)
assert.NoError(t, err)
@ -2239,7 +2231,7 @@ func TestRootCoord_Base(t *testing.T) {
p2 := sessionutil.Session{
ServerID: 101,
}
ctx2, cancel2 := context.WithTimeout(ctx, kvmetestore.RequestTimeout)
ctx2, cancel2 := context.WithTimeout(ctx, RequestTimeout)
defer cancel2()
s1, err := json.Marshal(&p1)
assert.NoError(t, err)
@ -2904,7 +2896,7 @@ func TestRootCoord2(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, collName, rsp.Schema.Name)
assert.Equal(t, collMeta.CollectionID, rsp.CollectionID)
assert.Equal(t, collMeta.ID, rsp.CollectionID)
assert.Equal(t, common.DefaultShardsNum, int32(len(rsp.VirtualChannelNames)))
assert.Equal(t, common.DefaultShardsNum, int32(len(rsp.PhysicalChannelNames)))
assert.Equal(t, common.DefaultShardsNum, rsp.ShardsNum)
@ -3002,7 +2994,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.Error(t, err)
c.CallBuildIndexService = func(ctx context.Context, segID UniqueID, binlog []string, field *model.Field, idxInfo *model.Index, numRows int64) (typeutil.UniqueID, error) {
c.CallBuildIndexService = func(ctx context.Context, segID UniqueID, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) {
return 0, nil
}
err = c.checkInit()
@ -3126,27 +3118,25 @@ func TestCheckFlushedSegments(t *testing.T) {
var segID int64 = 1001
var fieldID int64 = 101
var indexID int64 = 6001
core.MetaTable.segID2IndexMeta[segID] = make(map[int64]model.Index)
core.MetaTable.segID2IndexMeta[segID] = make(map[int64]etcdpb.SegmentIndexInfo)
core.MetaTable.partID2SegID[partID] = make(map[int64]bool)
core.MetaTable.collID2Meta[collID] = model.Collection{CollectionID: collID}
core.MetaTable.collID2Meta[collID] = etcdpb.CollectionInfo{ID: collID}
// do nothing, since collection has 0 index
core.checkFlushedSegments(ctx)
// get field schema by id fail
core.MetaTable.collID2Meta[collID] = model.Collection{
CollectionID: collID,
Partitions: []*model.Partition{
core.MetaTable.collID2Meta[collID] = etcdpb.CollectionInfo{
ID: collID,
PartitionIDs: []int64{partID},
FieldIndexes: []*etcdpb.FieldIndexInfo{
{
PartitionID: partID,
},
},
FieldIndexes: []*model.Index{
{
FieldID: fieldID,
FiledID: fieldID,
IndexID: indexID,
},
},
Fields: []*model.Field{},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{},
},
}
core.checkFlushedSegments(ctx)
@ -3162,26 +3152,23 @@ func TestCheckFlushedSegments(t *testing.T) {
core.checkFlushedSegments(core.ctx)
// missing index info
core.MetaTable.collID2Meta[collID] = model.Collection{
CollectionID: collID,
Fields: []*model.Field{
core.MetaTable.collID2Meta[collID] = etcdpb.CollectionInfo{
ID: collID,
PartitionIDs: []int64{partID},
FieldIndexes: []*etcdpb.FieldIndexInfo{
{
FieldID: fieldID,
},
},
FieldIndexes: []*model.Index{
{
FieldID: fieldID,
FiledID: fieldID,
IndexID: indexID,
},
},
Partitions: []*model.Partition{
{
PartitionID: partID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
},
},
},
}
core.checkFlushedSegments(ctx)
// existing segID, buildIndex failed
core.CallGetFlushedSegmentsService = func(_ context.Context, cid, pid int64) ([]int64, error) {
@ -3189,10 +3176,10 @@ func TestCheckFlushedSegments(t *testing.T) {
assert.Equal(t, partID, pid)
return []int64{segID}, nil
}
core.MetaTable.indexID2Meta[indexID] = model.Index{
core.MetaTable.indexID2Meta[indexID] = etcdpb.IndexInfo{
IndexID: indexID,
}
core.CallBuildIndexService = func(_ context.Context, segID UniqueID, binlog []string, field *model.Field, idx *model.Index, numRows int64) (int64, error) {
core.CallBuildIndexService = func(_ context.Context, segID UniqueID, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) {
assert.Equal(t, fieldID, field.FieldID)
assert.Equal(t, indexID, idx.IndexID)
return -1, errors.New("build index build")
@ -3201,7 +3188,7 @@ func TestCheckFlushedSegments(t *testing.T) {
core.checkFlushedSegments(ctx)
var indexBuildID int64 = 10001
core.CallBuildIndexService = func(_ context.Context, segID UniqueID, binlog []string, field *model.Field, idx *model.Index, numRows int64) (int64, error) {
core.CallBuildIndexService = func(_ context.Context, segID UniqueID, binlog []string, field *schemapb.FieldSchema, idx *etcdpb.IndexInfo, numRows int64) (int64, error) {
return indexBuildID, nil
}
core.checkFlushedSegments(core.ctx)
@ -3277,7 +3264,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
time.Sleep(100 * time.Millisecond)
modifyFunc := func(collInfo *model.Collection) {
modifyFunc := func(collInfo *etcdpb.CollectionInfo) {
collInfo.ShardsNum = 0
}
@ -3300,7 +3287,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
assert.Equal(t, collName, rsp.Schema.Name)
assert.Equal(t, collMeta.CollectionID, rsp.CollectionID)
assert.Equal(t, collMeta.ID, rsp.CollectionID)
assert.Equal(t, shardsNum, int32(len(rsp.VirtualChannelNames)))
assert.Equal(t, shardsNum, int32(len(rsp.PhysicalChannelNames)))
assert.Equal(t, shardsNum, rsp.ShardsNum)
@ -3357,25 +3344,20 @@ func TestCore_DescribeSegments(t *testing.T) {
// success.
c.MetaTable = &MetaTable{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]model.Index{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]etcdpb.SegmentIndexInfo{
segID: {
indexID: {
CollectionID: collID,
PartitionID: partID,
SegmentID: segID,
FieldID: fieldID,
IndexID: indexID,
SegmentIndexes: map[int64]model.SegmentIndex{
segID: {
Segment: model.Segment{
PartitionID: partID,
SegmentID: segID,
},
BuildID: buildID,
EnableIndex: true},
},
BuildID: buildID,
EnableIndex: true,
},
},
},
indexID2Meta: map[typeutil.UniqueID]model.Index{
indexID2Meta: map[typeutil.UniqueID]etcdpb.IndexInfo{
indexID: {
IndexName: indexName,
IndexID: indexID,

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"bytes"
@ -35,15 +35,15 @@ import (
)
var (
// SuffixSnapshotTombstone special value for tombstone mark
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
// suffixSnapshotTombstone special value for tombstone mark
suffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
)
// SuffixSnapshot implements SnapshotKV
// this is a simple replacement for MetaSnapshot, which is not available due to etcd compaction
// SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path
type SuffixSnapshot struct {
// internal kv which SuffixSnapshot based on
// suffixSnapshot implements SnapshotKV
// this is a simple replacement for metaSnapshot, which is not available due to etcd compaction
// suffixSnapshot record timestamp as prefix of a key under the snapshot prefix path
type suffixSnapshot struct {
// internal kv which suffixSnapshot based on
kv.TxnKV
// rw mutex provided range lock
sync.RWMutex
@ -76,10 +76,10 @@ type tsv struct {
}
// type conversion make sure implementation
var _ kv.SnapShotKV = (*SuffixSnapshot)(nil)
var _ kv.SnapShotKV = (*suffixSnapshot)(nil)
// NewSuffixSnapshot creates a NewSuffixSnapshot with provided kv
func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnapshot, error) {
// newSuffixSnapshot creates a newSuffixSnapshot with provided kv
func newSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*suffixSnapshot, error) {
if txnKV == nil {
return nil, retry.Unrecoverable(errors.New("txnKV is nil"))
}
@ -92,7 +92,7 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
tk = path.Join(root, "k")
rootLen := len(tk) - 1
return &SuffixSnapshot{
return &suffixSnapshot{
TxnKV: txnKV,
lastestTS: make(map[string]typeutil.Timestamp),
separator: sep,
@ -105,31 +105,31 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
}
// isTombstone helper function to check whether is tombstone mark
func (ss *SuffixSnapshot) isTombstone(value string) bool {
return bytes.Equal([]byte(value), SuffixSnapshotTombstone)
func (ss *suffixSnapshot) isTombstone(value string) bool {
return bytes.Equal([]byte(value), suffixSnapshotTombstone)
}
// hideRootPrefix helper function to hide root prefix from key
func (ss *SuffixSnapshot) hideRootPrefix(value string) string {
func (ss *suffixSnapshot) hideRootPrefix(value string) string {
return value[ss.rootLen:]
}
// composeSnapshotPrefix build a prefix for load snapshots
// formated like [snapshotPrefix]/key[sep]
func (ss *SuffixSnapshot) composeSnapshotPrefix(key string) string {
func (ss *suffixSnapshot) composeSnapshotPrefix(key string) string {
return path.Join(ss.snapshotPrefix, key+ss.separator)
}
// composeTSKey unified tsKey composing method
// uses key, ts and separator to form a key
func (ss *SuffixSnapshot) composeTSKey(key string, ts typeutil.Timestamp) string {
func (ss *suffixSnapshot) composeTSKey(key string, ts typeutil.Timestamp) string {
// [key][sep][ts]
return path.Join(ss.snapshotPrefix, fmt.Sprintf("%s%s%d", key, ss.separator, ts))
}
// isTSKey checks whether a key is in ts-key format
// if true, also returns parsed ts value
func (ss *SuffixSnapshot) isTSKey(key string) (typeutil.Timestamp, bool) {
func (ss *suffixSnapshot) isTSKey(key string) (typeutil.Timestamp, bool) {
// not in snapshot path
if !strings.HasPrefix(key, ss.snapshotPrefix) {
return 0, false
@ -146,7 +146,7 @@ func (ss *SuffixSnapshot) isTSKey(key string) (typeutil.Timestamp, bool) {
// isTSOfKey check whether a key is in ts-key format of provided group key
// if true, laso returns parsed ts value
func (ss *SuffixSnapshot) isTSOfKey(key string, groupKey string) (typeutil.Timestamp, bool) {
func (ss *suffixSnapshot) isTSOfKey(key string, groupKey string) (typeutil.Timestamp, bool) {
// not in snapshot path
if !strings.HasPrefix(key, ss.snapshotPrefix) {
return 0, false
@ -167,7 +167,7 @@ func (ss *SuffixSnapshot) isTSOfKey(key string, groupKey string) (typeutil.Times
// checkKeyTS checks provided key's latest ts is before provided ts
// lock is needed
func (ss *SuffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, error) {
func (ss *suffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, error) {
latest, has := ss.lastestTS[key]
if !has {
err := ss.loadLatestTS(key)
@ -180,11 +180,11 @@ func (ss *SuffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, e
}
// loadLatestTS load the loatest ts for specified key
func (ss *SuffixSnapshot) loadLatestTS(key string) error {
func (ss *suffixSnapshot) loadLatestTS(key string) error {
prefix := ss.composeSnapshotPrefix(key)
keys, _, err := ss.TxnKV.LoadWithPrefix(prefix)
if err != nil {
log.Warn("SuffixSnapshot txnkv LoadWithPrefix failed", zap.String("key", key),
log.Warn("suffixSnapshot txnkv LoadWithPrefix failed", zap.String("key", key),
zap.Error(err))
return err
}
@ -241,10 +241,10 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) {
}
// Save stores key-value pairs with timestamp
// if ts is 0, SuffixSnapshot works as a TxnKV
// otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path
// if ts is 0, suffixSnapshot works as a TxnKV
// otherwise, suffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path
// and for acceleration store original key-value if ts is the latest
func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error {
func (ss *suffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKv
// will not update lastestTs since ts not not valid
if ts == 0 {
@ -278,7 +278,7 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
return ss.TxnKV.Save(tsKey, value)
}
func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
func (ss *suffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
// if ts == 0, load latest by definition
// and with acceleration logic, just do load key will do
if ts == 0 {
@ -351,7 +351,7 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
// MultiSave save multiple kvs
// if ts == 0, act like TxnKV
// each key-value will be treated using same logic like Save
func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
func (ss *suffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
if ts == 0 {
return ss.TxnKV.MultiSave(kvs)
@ -378,7 +378,7 @@ func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp
// generateSaveExecute examine each key is the after the corresponding latest
// returns calculated execute map and update ts list
func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil.Timestamp) (map[string]string, []string, error) {
func (ss *suffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil.Timestamp) (map[string]string, []string, error) {
var after bool
var err error
execute := make(map[string]string)
@ -403,7 +403,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil
}
// LoadWithPrefix load keys with provided prefix and returns value in the ts
func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
func (ss *suffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
// ts 0 case shall be treated as fetch latest/current value
if ts == 0 {
keys, values, err := ss.TxnKV.LoadWithPrefix(key)
@ -484,7 +484,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
// MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well
// if ts == 0, act like TxnKV
// each key-value will be treated in same logic like Save
func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
func (ss *suffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
if ts == 0 {
return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals)
@ -503,15 +503,15 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
for _, removal := range removals {
keys, _, err := ss.TxnKV.LoadWithPrefix(removal)
if err != nil {
log.Warn("SuffixSnapshot TxnKV LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
log.Warn("suffixSnapshot TxnKV LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
return err
}
// add tombstone to orignal key and add ts entry
for _, key := range keys {
key = ss.hideRootPrefix(key)
execute[key] = string(SuffixSnapshotTombstone)
execute[ss.composeTSKey(key, ts)] = string(SuffixSnapshotTombstone)
execute[key] = string(suffixSnapshotTombstone)
execute[ss.composeTSKey(key, ts)] = string(suffixSnapshotTombstone)
updateList = append(updateList, key)
}
}

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"fmt"
@ -171,7 +171,7 @@ func Test_binarySearchRecords(t *testing.T) {
func Test_ComposeIsTsKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
ss, err := newSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
type testcase struct {
key string
@ -209,7 +209,7 @@ func Test_ComposeIsTsKey(t *testing.T) {
func Test_SuffixSnaphotIsTSOfKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
ss, err := newSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
type testcase struct {
key string
@ -274,7 +274,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
return vtso
}
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err := newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
@ -295,7 +295,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err = newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
@ -326,7 +326,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
return vtso
}
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err := newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
@ -358,7 +358,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err = newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
@ -403,7 +403,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
return vtso
}
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err := newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
@ -440,7 +440,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.Equal(t, 39-i, len(vals))
}
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
ss, err = newSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)

View File

@ -24,7 +24,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
model "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -156,6 +155,21 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
}
collInfo := etcdpb.CollectionInfo{
ID: collID,
Schema: &schema,
PartitionIDs: []typeutil.UniqueID{partID},
PartitionNames: []string{Params.CommonCfg.DefaultPartitionName},
FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
ShardsNum: t.Req.ShardsNum,
PartitionCreatedTimestamps: []uint64{0},
ConsistencyLevel: t.Req.ConsistencyLevel,
}
idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
schemaBytes, err := proto.Marshal(&schema)
@ -190,27 +204,6 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("encodeDdOperation fail, error = %w", err)
}
collInfo := model.Collection{
CollectionID: collID,
Name: schema.Name,
Description: schema.Description,
AutoID: schema.AutoID,
Fields: model.BatchConvertFieldPBToModel(schema.Fields),
VirtualChannelNames: vchanNames,
PhysicalChannelNames: chanNames,
ShardsNum: t.Req.ShardsNum,
ConsistencyLevel: t.Req.ConsistencyLevel,
FieldIndexes: make([]*model.Index, 0, 16),
CreateTime: ts,
Partitions: []*model.Partition{
{
PartitionID: partID,
PartitionName: Params.CommonCfg.DefaultPartitionName,
PartitionCreatedTimestamp: ts,
},
},
}
// use lambda function here to guarantee all resources to be released
createCollectionFn := func() error {
// lock for ddl operation
@ -239,7 +232,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
// update meta table after send dd operation
if err = t.core.MetaTable.AddCollection(&collInfo, ts, ddOpStr); err != nil {
if err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr); err != nil {
t.core.chanTimeTick.removeDmlChannels(chanNames...)
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
@ -297,17 +290,17 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
DbID: 0, //not used
CollectionID: collMeta.CollectionID,
CollectionID: collMeta.ID,
}
reason := fmt.Sprintf("drop collection %d", collMeta.CollectionID)
reason := fmt.Sprintf("drop collection %d", collMeta.ID)
ts, err := t.core.TSOAllocator(1)
if err != nil {
return fmt.Errorf("TSO alloc fail, error = %w", err)
}
//notify query service to release collection
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.CollectionID); err != nil {
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil {
log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
return err
}
@ -346,7 +339,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
}
// update meta table after send dd operation
if err = t.core.MetaTable.DeleteCollection(collMeta.CollectionID, ts, ddOpStr); err != nil {
if err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr); err != nil {
return err
}
@ -380,7 +373,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
}
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collMeta.CollectionID, ts)
err = t.core.ExpireMetaCache(ctx, nil, collMeta.ID, ts)
if err != nil {
return err
}
@ -432,7 +425,7 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeCollection {
return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var collInfo *model.Collection
var collInfo *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName != "" {
@ -447,13 +440,8 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
}
}
t.Rsp.Schema = &schemapb.CollectionSchema{
Name: collInfo.Name,
Description: collInfo.Description,
AutoID: collInfo.AutoID,
Fields: model.BatchConvertToFieldSchemaPB(collInfo.Fields),
}
t.Rsp.CollectionID = collInfo.CollectionID
t.Rsp.Schema = proto.Clone(collInfo.Schema).(*schemapb.CollectionSchema)
t.Rsp.CollectionID = collInfo.ID
t.Rsp.VirtualChannelNames = collInfo.VirtualChannelNames
t.Rsp.PhysicalChannelNames = collInfo.PhysicalChannelNames
if collInfo.ShardsNum == 0 {
@ -465,8 +453,8 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
t.Rsp.CreatedTimestamp = collInfo.CreateTime
createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime)
t.Rsp.CreatedUtcTimestamp = uint64(createdPhysicalTime)
t.Rsp.Aliases = t.core.MetaTable.ListAliases(collInfo.CollectionID)
t.Rsp.StartPositions = collInfo.StartPositions
t.Rsp.Aliases = t.core.MetaTable.ListAliases(collInfo.ID)
t.Rsp.StartPositions = collInfo.GetStartPositions()
t.Rsp.CollectionName = t.Rsp.Schema.Name
return nil
}
@ -494,7 +482,7 @@ func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
}
for name, meta := range coll {
t.Rsp.CollectionNames = append(t.Rsp.CollectionNames, name)
t.Rsp.CollectionIds = append(t.Rsp.CollectionIds, meta.CollectionID)
t.Rsp.CollectionIds = append(t.Rsp.CollectionIds, meta.ID)
t.Rsp.CreatedTimestamps = append(t.Rsp.CreatedTimestamps, meta.CreateTime)
physical, _ := tsoutil.ParseHybridTs(meta.CreateTime)
t.Rsp.CreatedUtcTimestamps = append(t.Rsp.CreatedUtcTimestamps, uint64(physical))
@ -533,7 +521,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
CollectionName: t.Req.CollectionName,
PartitionName: t.Req.PartitionName,
DbID: 0, // todo, not used
CollectionID: collMeta.CollectionID,
CollectionID: collMeta.ID,
PartitionID: partID,
}
@ -566,7 +554,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
}
// update meta table after send dd operation
if err = t.core.MetaTable.AddPartition(collMeta.CollectionID, t.Req.PartitionName, partID, ts, ddOpStr); err != nil {
if err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr); err != nil {
return err
}
@ -584,7 +572,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
}
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collMeta.CollectionID, ts)
err = t.core.ExpireMetaCache(ctx, nil, collMeta.ID, ts)
if err != nil {
return err
}
@ -613,7 +601,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partID, err := t.core.MetaTable.GetPartitionByName(collInfo.CollectionID, t.Req.PartitionName, 0)
partID, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName, 0)
if err != nil {
return err
}
@ -624,7 +612,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
CollectionName: t.Req.CollectionName,
PartitionName: t.Req.PartitionName,
DbID: 0, //todo,not used
CollectionID: collInfo.CollectionID,
CollectionID: collInfo.ID,
PartitionID: partID,
}
@ -657,7 +645,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
}
// update meta table after send dd operation
if _, err = t.core.MetaTable.DeletePartition(collInfo.CollectionID, t.Req.PartitionName, ts, ddOpStr); err != nil {
if _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr); err != nil {
return err
}
@ -675,7 +663,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
}
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collInfo.CollectionID, ts)
err = t.core.ExpireMetaCache(ctx, nil, collInfo.ID, ts)
if err != nil {
return err
}
@ -712,7 +700,7 @@ func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
t.HasPartition = t.core.MetaTable.HasPartition(coll.CollectionID, t.Req.PartitionName, 0)
t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName, 0)
return nil
}
@ -733,7 +721,7 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowPartitions {
return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
var coll *model.Collection
var coll *etcdpb.CollectionInfo
var err error
if t.Req.CollectionName == "" {
coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0)
@ -743,13 +731,12 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
for _, part := range coll.Partitions {
t.Rsp.PartitionIDs = append(t.Rsp.PartitionIDs, part.PartitionID)
t.Rsp.PartitionNames = append(t.Rsp.PartitionNames, part.PartitionName)
t.Rsp.CreatedTimestamps = append(t.Rsp.CreatedTimestamps, part.PartitionCreatedTimestamp)
physical, _ := tsoutil.ParseHybridTs(part.PartitionCreatedTimestamp)
t.Rsp.PartitionIDs = coll.PartitionIDs
t.Rsp.PartitionNames = coll.PartitionNames
t.Rsp.CreatedTimestamps = coll.PartitionCreatedTimestamps
t.Rsp.CreatedUtcTimestamps = make([]uint64, 0, len(coll.PartitionCreatedTimestamps))
for _, ts := range coll.PartitionCreatedTimestamps {
physical, _ := tsoutil.ParseHybridTs(ts)
t.Rsp.CreatedUtcTimestamps = append(t.Rsp.CreatedUtcTimestamps, uint64(physical))
}
@ -780,7 +767,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
segIDs, err := t.core.CallGetFlushedSegmentsService(ctx, t.Req.CollectionID, -1)
if err != nil {
log.Debug("Get flushed segment from data coord failed", zap.String("collection_name", coll.Name), zap.Error(err))
log.Debug("Get flushed segment from data coord failed", zap.String("collection_name", coll.Schema.Name), zap.Error(err))
return err
}
@ -796,16 +783,16 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("segment id %d not belong to collection id %d", t.Req.SegmentID, t.Req.CollectionID)
}
//TODO, get filed_id and index_name from request
index, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
log.Debug("RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID),
zap.Any("index", index), zap.Error(err))
zap.Any("segIdxInfo", segIdxInfo), zap.Error(err))
if err != nil {
return err
}
t.Rsp.IndexID = index.IndexID
t.Rsp.BuildID = index.SegmentIndexes[t.Req.SegmentID].BuildID
t.Rsp.EnableIndex = index.SegmentIndexes[t.Req.SegmentID].EnableIndex
t.Rsp.FieldID = index.FieldID
t.Rsp.IndexID = segIdxInfo.IndexID
t.Rsp.BuildID = segIdxInfo.BuildID
t.Rsp.EnableIndex = segIdxInfo.EnableIndex
t.Rsp.FieldID = segIdxInfo.FieldID
return nil
}
@ -831,8 +818,8 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
return err
}
exist := false
for _, partition := range coll.Partitions {
if partition.PartitionID == t.Req.PartitionID {
for _, partID := range coll.PartitionIDs {
if partID == t.Req.PartitionID {
exist = true
break
}
@ -842,7 +829,7 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
}
segIDs, err := t.core.CallGetFlushedSegmentsService(ctx, t.Req.CollectionID, t.Req.PartitionID)
if err != nil {
log.Debug("Get flushed segments from data coord failed", zap.String("collection name", coll.Name), zap.Int64("partition id", t.Req.PartitionID), zap.Error(err))
log.Debug("Get flushed segments from data coord failed", zap.String("collection name", coll.Schema.Name), zap.Int64("partition id", t.Req.PartitionID), zap.Error(err))
return err
}
@ -899,26 +886,23 @@ func (t *DescribeSegmentsReqTask) Execute(ctx context.Context) error {
}
}
index, err := t.core.MetaTable.GetSegmentIndexInfos(segID)
segmentInfo, err := t.core.MetaTable.GetSegmentIndexInfos(segID)
if err != nil {
continue
}
for indexID, indexInfo := range index {
for _, segmentIndex := range indexInfo.SegmentIndexes {
t.Rsp.SegmentInfos[segID].IndexInfos =
append(t.Rsp.SegmentInfos[segID].IndexInfos,
&etcdpb.SegmentIndexInfo{
CollectionID: indexInfo.CollectionID,
PartitionID: segmentIndex.Segment.PartitionID,
SegmentID: segmentIndex.Segment.SegmentID,
FieldID: indexInfo.FieldID,
IndexID: indexInfo.IndexID,
BuildID: segmentIndex.BuildID,
EnableIndex: segmentIndex.EnableIndex,
})
}
for indexID, indexInfo := range segmentInfo {
t.Rsp.SegmentInfos[segID].IndexInfos =
append(t.Rsp.SegmentInfos[segID].IndexInfos,
&etcdpb.SegmentIndexInfo{
CollectionID: indexInfo.CollectionID,
PartitionID: indexInfo.PartitionID,
SegmentID: indexInfo.SegmentID,
FieldID: indexInfo.FieldID,
IndexID: indexInfo.IndexID,
BuildID: indexInfo.BuildID,
EnableIndex: indexInfo.EnableIndex,
})
extraIndexInfo, err := t.core.MetaTable.GetIndexByID(indexID)
if err != nil {
log.Error("index not found in meta table",
@ -928,7 +912,7 @@ func (t *DescribeSegmentsReqTask) Execute(ctx context.Context) error {
zap.Int64("segment", segID))
return err
}
t.Rsp.SegmentInfos[segID].ExtraIndexInfos[indexID] = model.ConvertToIndexPB(extraIndexInfo)
t.Rsp.SegmentInfos[segID].ExtraIndexInfos[indexID] = extraIndexInfo
}
}
@ -960,7 +944,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
idxInfo := &model.Index{
idxInfo := &etcdpb.IndexInfo{
IndexName: indexName,
IndexID: indexID,
IndexParams: t.Req.ExtraParams,
@ -975,54 +959,45 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
segID2PartID, segID2Binlog, err := t.core.getSegments(ctx, collMeta.CollectionID)
segID2PartID, segID2Binlog, err := t.core.getSegments(ctx, collMeta.ID)
flushedSegs := make([]typeutil.UniqueID, 0, len(segID2PartID))
for k := range segID2PartID {
flushedSegs = append(flushedSegs, k)
}
if err != nil {
log.Debug("get flushed segments from data coord failed", zap.String("collection_name", collMeta.Name), zap.Error(err))
log.Debug("Get flushed segments from data coord failed", zap.String("collection_name", collMeta.Schema.Name), zap.Error(err))
return err
}
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo, flushedSegs)
if err != nil {
log.Debug("get not indexed segments failed", zap.Int64("collection_id", collMeta.CollectionID), zap.Error(err))
log.Debug("RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
return err
}
if err := t.core.MetaTable.AddIndex(t.Req.CollectionName, t.Req.FieldName, idxInfo, segIDs); err != nil {
log.Debug("add index into metastore failed", zap.Int64("collection_id", collMeta.CollectionID), zap.Int64("index_id", idxInfo.IndexID), zap.Error(err))
return err
}
collectionID := collMeta.ID
cnt := 0
for _, segID := range segIDs {
segmentIndex := model.SegmentIndex{
Segment: model.Segment{
SegmentID: segID,
PartitionID: segID2PartID[segID],
},
EnableIndex: false,
info := etcdpb.SegmentIndexInfo{
CollectionID: collectionID,
PartitionID: segID2PartID[segID],
SegmentID: segID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
segmentIndex.BuildID, err = t.core.BuildIndex(ctx, segID, segID2Binlog[segID].GetNumOfRows(), segID2Binlog[segID].GetFieldBinlogs(), &field, idxInfo, false)
info.BuildID, err = t.core.BuildIndex(ctx, segID, segID2Binlog[segID].GetNumOfRows(), segID2Binlog[segID].GetFieldBinlogs(), &field, idxInfo, false)
if err != nil {
return err
}
if segmentIndex.BuildID != 0 {
segmentIndex.EnableIndex = true
if info.BuildID != 0 {
info.EnableIndex = true
}
index := &model.Index{
CollectionID: collMeta.CollectionID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
SegmentIndexes: map[int64]model.SegmentIndex{segID: segmentIndex},
}
if err := t.core.MetaTable.AlterIndex(index); err != nil {
log.Debug("alter index into meta table failed", zap.Int64("collection_id", collMeta.CollectionID), zap.Int64("index_id", index.IndexID), zap.Int64("build_id", segmentIndex.BuildID), zap.Error(err))
if err := t.core.MetaTable.AddIndex(&info); err != nil {
log.Debug("Add index into meta table failed", zap.Int64("collection_id", collMeta.ID), zap.Int64("index_id", info.IndexID), zap.Int64("build_id", info.BuildID), zap.Error(err))
}
cnt++
}
return nil

View File

@ -5,11 +5,13 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func TestDescribeSegmentReqTask_Type(t *testing.T) {
@ -64,28 +66,22 @@ func TestDescribeSegmentsReqTask_Execute(t *testing.T) {
return []typeutil.UniqueID{segID}, nil
}
c.MetaTable = &MetaTable{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]model.Index{},
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]etcdpb.SegmentIndexInfo{},
}
assert.NoError(t, tsk.Execute(context.Background()))
// index not found in meta.
c.MetaTable = &MetaTable{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]model.Index{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]etcdpb.SegmentIndexInfo{
segID: {
indexID: {
CollectionID: collID,
PartitionID: partID,
SegmentID: segID,
FieldID: fieldID,
IndexID: indexID,
SegmentIndexes: map[int64]model.SegmentIndex{
segID: {
Segment: model.Segment{
SegmentID: segID,
PartitionID: partID,
},
BuildID: buildID,
EnableIndex: true,
},
},
BuildID: buildID,
EnableIndex: true,
},
},
},
@ -94,26 +90,20 @@ func TestDescribeSegmentsReqTask_Execute(t *testing.T) {
// success.
c.MetaTable = &MetaTable{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]model.Index{
segID2IndexMeta: map[typeutil.UniqueID]map[typeutil.UniqueID]etcdpb.SegmentIndexInfo{
segID: {
indexID: {
CollectionID: collID,
PartitionID: partID,
SegmentID: segID,
FieldID: fieldID,
IndexID: indexID,
SegmentIndexes: map[int64]model.SegmentIndex{
segID: {
Segment: model.Segment{
SegmentID: segID,
PartitionID: partID,
},
BuildID: buildID,
EnableIndex: true,
},
},
BuildID: buildID,
EnableIndex: true,
},
},
},
indexID2Meta: map[typeutil.UniqueID]model.Index{
indexID2Meta: map[typeutil.UniqueID]etcdpb.IndexInfo{
indexID: {
IndexName: indexName,
IndexID: indexID,

View File

@ -20,11 +20,11 @@ import (
"encoding/json"
"fmt"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -50,8 +50,8 @@ func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair)
}
// GetFieldSchemaByID return field schema by id
func GetFieldSchemaByID(coll *model.Collection, fieldID typeutil.UniqueID) (*model.Field, error) {
for _, f := range coll.Fields {
func GetFieldSchemaByID(coll *etcdpb.CollectionInfo, fieldID typeutil.UniqueID) (*schemapb.FieldSchema, error) {
for _, f := range coll.Schema.Fields {
if f.FieldID == fieldID {
return f, nil
}
@ -60,12 +60,12 @@ func GetFieldSchemaByID(coll *model.Collection, fieldID typeutil.UniqueID) (*mod
}
// GetFieldSchemaByIndexID return field schema by it's index id
func GetFieldSchemaByIndexID(coll *model.Collection, idxID typeutil.UniqueID) (*model.Field, error) {
func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueID) (*schemapb.FieldSchema, error) {
var fieldID typeutil.UniqueID
exist := false
for _, f := range coll.FieldIndexes {
if f.IndexID == idxID {
fieldID = f.FieldID
fieldID = f.FiledID
exist = true
break
}
@ -98,6 +98,16 @@ func DecodeDdOperation(str string, ddOp *DdOperation) error {
return json.Unmarshal([]byte(str), ddOp)
}
// SegmentIndexInfoEqual return true if SegmentIndexInfos are identical
func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.SegmentIndexInfo) bool {
return info1.CollectionID == info2.CollectionID &&
info1.PartitionID == info2.PartitionID &&
info1.SegmentID == info2.SegmentID &&
info1.FieldID == info2.FieldID &&
info1.IndexID == info2.IndexID &&
info1.EnableIndex == info2.EnableIndex
}
// EncodeMsgPositions serialize []*MsgPosition into string
func EncodeMsgPositions(msgPositions []*msgstream.MsgPosition) (string, error) {
if len(msgPositions) == 0 {

View File

@ -19,10 +19,10 @@ package rootcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
@ -60,10 +60,12 @@ func Test_EqualKeyPairArray(t *testing.T) {
}
func Test_GetFieldSchemaByID(t *testing.T) {
coll := &model.Collection{
Fields: []*model.Field{
{
FieldID: 1,
coll := &etcdpb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
},
},
},
}
@ -74,15 +76,17 @@ func Test_GetFieldSchemaByID(t *testing.T) {
}
func Test_GetFieldSchemaByIndexID(t *testing.T) {
coll := &model.Collection{
Fields: []*model.Field{
{
FieldID: 1,
coll := &etcdpb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
},
},
},
FieldIndexes: []*model.Index{
FieldIndexes: []*etcdpb.FieldIndexInfo{
{
FieldID: 1,
FiledID: 1,
IndexID: 2,
},
},