mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Remove unused KV functions (#25972)
Remove RevertAlterSegmentsAndAddNewSegment Remove AlterIndex Remove AlterSegmentsAndAddNewSegment Remove AlterSegmentIndex Remove IndexCoordCatalog Signed-off-by: Filip Haltmayer <filip.haltmayer@zilliz.com>
This commit is contained in:
parent
84253f255e
commit
7bd9eceb15
@ -112,8 +112,6 @@ type DataCoordCatalog interface {
|
||||
AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error
|
||||
// TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction
|
||||
AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...BinlogsIncrement) error
|
||||
// AlterSegmentsAndAddNewSegment for transaction
|
||||
AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error
|
||||
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
|
||||
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
|
||||
|
||||
@ -129,33 +127,17 @@ type DataCoordCatalog interface {
|
||||
|
||||
CreateIndex(ctx context.Context, index *model.Index) error
|
||||
ListIndexes(ctx context.Context) ([]*model.Index, error)
|
||||
AlterIndex(ctx context.Context, newIndex *model.Index) error
|
||||
AlterIndexes(ctx context.Context, newIndexes []*model.Index) error
|
||||
DropIndex(ctx context.Context, collID, dropIdxID typeutil.UniqueID) error
|
||||
|
||||
CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error
|
||||
ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error)
|
||||
AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error
|
||||
AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error
|
||||
DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error
|
||||
|
||||
GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool
|
||||
}
|
||||
|
||||
type IndexCoordCatalog interface {
|
||||
CreateIndex(ctx context.Context, index *model.Index) error
|
||||
ListIndexes(ctx context.Context) ([]*model.Index, error)
|
||||
AlterIndex(ctx context.Context, newIndex *model.Index) error
|
||||
AlterIndexes(ctx context.Context, newIndexes []*model.Index) error
|
||||
DropIndex(ctx context.Context, collID, dropIdxID typeutil.UniqueID) error
|
||||
|
||||
CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error
|
||||
ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error)
|
||||
AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error
|
||||
AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error
|
||||
DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error
|
||||
}
|
||||
|
||||
type QueryCoordCatalog interface {
|
||||
SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
|
||||
SavePartition(info ...*querypb.PartitionLoadInfo) error
|
||||
|
@ -406,84 +406,6 @@ func (kc *Catalog) hasBinlogPrefix(segment *datapb.SegmentInfo) (bool, error) {
|
||||
return hasBinlogPrefix || hasDeltaPrefix || hasStatsPrefix, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
|
||||
kvs := make(map[string]string)
|
||||
|
||||
for _, s := range segments {
|
||||
noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(s)
|
||||
// `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`.
|
||||
segmentutil.ReCalcRowCount(s, noBinlogsSegment)
|
||||
// for compacted segments
|
||||
if noBinlogsSegment.State == commonpb.SegmentState_Dropped {
|
||||
hasBinlogkeys, err := kc.hasBinlogPrefix(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// In order to guarantee back compatibility, the old format segments need
|
||||
// convert to new format that include segment key and three binlog keys,
|
||||
// or GC can not find data path on the storage.
|
||||
if !hasBinlogkeys {
|
||||
binlogsKvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maps.Copy(kvs, binlogsKvs)
|
||||
}
|
||||
}
|
||||
|
||||
k, v, err := buildSegmentKv(noBinlogsSegment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
kvs[k] = v
|
||||
}
|
||||
|
||||
if newSegment != nil {
|
||||
segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maps.Copy(kvs, segmentKvs)
|
||||
if newSegment.NumOfRows > 0 {
|
||||
kc.collectMetrics(newSegment)
|
||||
}
|
||||
}
|
||||
return kc.MetaKv.MultiSave(kvs)
|
||||
}
|
||||
|
||||
// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AlterSegmentsAndAddNewSegment
|
||||
func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error {
|
||||
var (
|
||||
kvs = make(map[string]string)
|
||||
removals []string
|
||||
)
|
||||
|
||||
for _, s := range oldSegments {
|
||||
segmentKvs, err := buildSegmentAndBinlogsKvs(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maps.Copy(kvs, segmentKvs)
|
||||
}
|
||||
|
||||
if removeSegment != nil {
|
||||
segKey := buildSegmentPath(removeSegment.GetCollectionID(), removeSegment.GetPartitionID(), removeSegment.GetID())
|
||||
removals = append(removals, segKey)
|
||||
binlogKeys := buildBinlogKeys(removeSegment)
|
||||
removals = append(removals, binlogKeys...)
|
||||
}
|
||||
|
||||
err := kc.MetaKv.MultiSaveAndRemove(kvs, removals)
|
||||
if err != nil {
|
||||
log.Warn("batch save and remove segments failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error {
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
@ -685,10 +607,6 @@ func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
|
||||
return indexes, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error {
|
||||
return kc.CreateIndex(ctx, index)
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) error {
|
||||
kvs := make(map[string]string)
|
||||
for _, index := range indexes {
|
||||
@ -755,10 +673,6 @@ func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentInde
|
||||
return segIndexes, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error {
|
||||
return kc.CreateSegmentIndex(ctx, segIdx)
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.SegmentIndex) error {
|
||||
kvs := make(map[string]string)
|
||||
for _, segIdx := range segIdxes {
|
||||
|
@ -431,62 +431,6 @@ func Test_AlterSegments(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {
|
||||
t.Run("save error", func(t *testing.T) {
|
||||
metakv := mocks.NewMetaKv(t)
|
||||
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error"))
|
||||
|
||||
catalog := NewCatalog(metakv, rootPath, "")
|
||||
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{}, segment1)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("get prefix fail", func(t *testing.T) {
|
||||
metakv := mocks.NewMetaKv(t)
|
||||
metakv.EXPECT().HasPrefix(mock.Anything).Return(false, errors.New("error"))
|
||||
|
||||
catalog := NewCatalog(metakv, rootPath, "")
|
||||
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, nil)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("save successfully", func(t *testing.T) {
|
||||
savedKvs := make(map[string]string, 0)
|
||||
metakv := mocks.NewMetaKv(t)
|
||||
metakv.EXPECT().MultiSave(mock.Anything).RunAndReturn(func(m map[string]string) error {
|
||||
maps.Copy(savedKvs, m)
|
||||
return nil
|
||||
})
|
||||
metakv.EXPECT().Load(mock.Anything).RunAndReturn(func(s string) (string, error) {
|
||||
if v, ok := savedKvs[s]; ok {
|
||||
return v, nil
|
||||
}
|
||||
return "", errors.New("key not found")
|
||||
})
|
||||
metakv.EXPECT().HasPrefix(mock.Anything).Return(false, nil)
|
||||
|
||||
catalog := NewCatalog(metakv, rootPath, "")
|
||||
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 8, len(savedKvs))
|
||||
verifySavedKvsForDroppedSegment(t, savedKvs)
|
||||
verifySavedKvsForSegment(t, savedKvs)
|
||||
|
||||
adjustedSeg, err := catalog.LoadFromSegmentPath(droppedSegment.CollectionID, droppedSegment.PartitionID, droppedSegment.ID)
|
||||
assert.NoError(t, err)
|
||||
// Check that num of rows is corrected from 100 to 5.
|
||||
assert.Equal(t, int64(100), droppedSegment.GetNumOfRows())
|
||||
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
|
||||
|
||||
adjustedSeg, err = catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID)
|
||||
assert.NoError(t, err)
|
||||
// Check that num of rows is corrected from 100 to 5.
|
||||
assert.Equal(t, int64(100), droppedSegment.GetNumOfRows())
|
||||
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
|
||||
})
|
||||
}
|
||||
|
||||
func Test_DropSegment(t *testing.T) {
|
||||
t.Run("remove failed", func(t *testing.T) {
|
||||
metakv := mocks.NewMetaKv(t)
|
||||
@ -595,25 +539,6 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalog_RevertAlterSegmentsAndAddNewSegment(t *testing.T) {
|
||||
t.Run("save error", func(t *testing.T) {
|
||||
txn := mocks.NewMetaKv(t)
|
||||
txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(errors.New("mock error"))
|
||||
|
||||
catalog := NewCatalog(txn, rootPath, "")
|
||||
err := catalog.RevertAlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{segment1}, droppedSegment)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("revert successfully", func(t *testing.T) {
|
||||
txn := mocks.NewMetaKv(t)
|
||||
txn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Return(nil)
|
||||
catalog := NewCatalog(txn, rootPath, "")
|
||||
err := catalog.RevertAlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{segment1}, droppedSegment)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelCP(t *testing.T) {
|
||||
mockVChannel := "fake-by-dev-rootcoord-dml-1-testchannelcp-v0"
|
||||
mockPChannel := "fake-by-dev-rootcoord-dml-1"
|
||||
@ -907,30 +832,6 @@ func TestCatalog_ListIndexes(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterIndex(t *testing.T) {
|
||||
i := &model.Index{
|
||||
CollectionID: 0,
|
||||
FieldID: 0,
|
||||
IndexID: 0,
|
||||
IndexName: "",
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
}
|
||||
t.Run("add", func(t *testing.T) {
|
||||
txn := mocks.NewMetaKv(t)
|
||||
txn.EXPECT().Save(mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
catalog := &Catalog{
|
||||
MetaKv: txn,
|
||||
}
|
||||
|
||||
err := catalog.AlterIndex(context.Background(), i)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterIndexes(t *testing.T) {
|
||||
i := &model.Index{
|
||||
CollectionID: 0,
|
||||
@ -1073,36 +974,6 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterSegmentIndex(t *testing.T) {
|
||||
segIdx := &model.SegmentIndex{
|
||||
SegmentID: 0,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
NumRows: 0,
|
||||
IndexID: 0,
|
||||
BuildID: 0,
|
||||
NodeID: 0,
|
||||
IndexState: 0,
|
||||
FailReason: "",
|
||||
IndexVersion: 0,
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
}
|
||||
|
||||
t.Run("add", func(t *testing.T) {
|
||||
metakv := mocks.NewMetaKv(t)
|
||||
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(nil)
|
||||
catalog := &Catalog{
|
||||
MetaKv: metakv,
|
||||
}
|
||||
|
||||
err := catalog.AlterSegmentIndex(context.Background(), segIdx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterSegmentIndexes(t *testing.T) {
|
||||
segIdx := &model.SegmentIndex{
|
||||
SegmentID: 0,
|
||||
|
Loading…
Reference in New Issue
Block a user