Reduce running unit test time for indexcoord (#7779)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-09-14 10:41:21 +08:00 committed by GitHub
parent 8a458b7a38
commit 63a83f027f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 23 deletions

View File

@ -44,13 +44,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
reqTimeoutInterval = time.Second * 10
durationInterval = time.Second * 10
assignTaskInterval = time.Second * 3
taskLimit = 20
)
type IndexCoord struct {
stateCode atomic.Value
@ -76,6 +69,11 @@ type IndexCoord struct {
nodeLock sync.RWMutex
reqTimeoutInterval time.Duration
durationInterval time.Duration
assignTaskInterval time.Duration
taskLimit int
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -88,8 +86,12 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
rand.Seed(time.Now().UnixNano())
ctx1, cancel := context.WithCancel(ctx)
i := &IndexCoord{
loopCtx: ctx1,
loopCancel: cancel,
loopCtx: ctx1,
loopCancel: cancel,
reqTimeoutInterval: time.Second * 10,
durationInterval: time.Second * 10,
assignTaskInterval: time.Second * 3,
taskLimit: 20,
}
i.UpdateStateCode(internalpb.StateCode_Abnormal)
return i, nil
@ -325,7 +327,7 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ
}
var cancel func()
t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
t.ctx, cancel = context.WithTimeout(ctx, i.reqTimeoutInterval)
defer cancel()
fn := func() error {
@ -550,7 +552,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
defer cancel()
defer i.loopWg.Done()
timeTicker := time.NewTicker(durationInterval)
timeTicker := time.NewTicker(i.durationInterval)
log.Debug("IndexCoord start recycleUnusedIndexFiles loop")
for {
@ -558,7 +560,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
case <-ctx.Done():
return
case <-timeTicker.C:
metas := i.metaTable.GetUnusedIndexFiles(taskLimit)
metas := i.metaTable.GetUnusedIndexFiles(i.taskLimit)
log.Debug("IndexCoord recycleUnusedIndexFiles", zap.Int("Need recycle tasks num", len(metas)))
for _, meta := range metas {
if meta.indexMeta.MarkDeleted {
@ -670,7 +672,7 @@ func (i *IndexCoord) watchMetaLoop() {
}
func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateIndexRequest) bool {
ctx, cancel := context.WithTimeout(i.loopCtx, reqTimeoutInterval)
ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval)
defer cancel()
resp, err := builderClient.CreateIndex(ctx, req)
if err != nil {
@ -691,7 +693,7 @@ func (i *IndexCoord) assignTaskLoop() {
defer cancel()
defer i.loopWg.Done()
timeTicker := time.NewTicker(assignTaskInterval)
timeTicker := time.NewTicker(i.assignTaskInterval)
log.Debug("IndexCoord start assignTask loop")
for {
@ -750,7 +752,7 @@ func (i *IndexCoord) assignTaskLoop() {
log.Debug("This task has been assigned", zap.Int64("indexBuildID", indexBuildID),
zap.Int64("The IndexNode execute this task", nodeID))
i.nodeManager.pq.IncPriority(nodeID, 1)
if index > taskLimit {
if index > i.taskLimit {
break
}
}

View File

@ -45,6 +45,10 @@ func TestIndexCoord(t *testing.T) {
assert.Nil(t, err)
ic, err := NewIndexCoord(ctx)
assert.Nil(t, err)
ic.reqTimeoutInterval = time.Second * 10
ic.durationInterval = time.Second
ic.assignTaskInterval = time.Second
ic.taskLimit = 20
Params.Init()
err = ic.Register()
assert.Nil(t, err)
@ -122,8 +126,6 @@ func TestIndexCoord(t *testing.T) {
assert.Equal(t, "IndexFilePath-2", resp.FilePaths[0].IndexFilePaths[1])
})
time.Sleep(10 * time.Second)
t.Run("Drop Index", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
IndexID: indexID,
@ -173,10 +175,16 @@ func TestIndexCoord(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("Recycle IndexMeta", func(t *testing.T) {
indexMeta := ic.metaTable.GetIndexMetaByIndexBuildID(indexBuildID)
for indexMeta != nil {
indexMeta = ic.metaTable.GetIndexMetaByIndexBuildID(indexBuildID)
time.Sleep(time.Second)
}
})
err = in.Stop()
assert.Nil(t, err)
time.Sleep(11 * time.Second)
err = ic.Stop()
assert.Nil(t, err)
}

View File

@ -494,3 +494,16 @@ func (mt *metaTable) GetNodeTaskStats() map[UniqueID]int {
}
return nodePriority
}
func (mt *metaTable) GetIndexMetaByIndexBuildID(indexBuildID UniqueID) *indexpb.IndexMeta {
mt.lock.RLock()
defer mt.lock.RUnlock()
log.Debug("IndexCoord MetaTable GetIndexMeta", zap.Int64("IndexBuildID", indexBuildID))
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
log.Error("IndexCoord MetaTable GetIndexMeta not exist", zap.Int64("IndexBuildID", indexBuildID))
return nil
}
return meta.indexMeta
}

View File

@ -76,6 +76,14 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
})
t.Run("GetIndexMetaByIndexBuildID", func(t *testing.T) {
indexMeta := metaTable.GetIndexMetaByIndexBuildID(1)
assert.NotNil(t, indexMeta)
indexMeta2 := metaTable.GetIndexMetaByIndexBuildID(20)
assert.Nil(t, indexMeta2)
})
t.Run("BuildIndex", func(t *testing.T) {
err = metaTable.BuildIndex(UniqueID(4), 1)
assert.NotNil(t, err)

View File

@ -14,6 +14,7 @@ package indexnode
import (
"context"
"errors"
"fmt"
"sync"
"github.com/milvus-io/milvus/internal/log"
@ -79,10 +80,14 @@ func (inm *Mock) buildIndexTask() {
indexMeta.State = commonpb.IndexState_Failed
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
indexMeta.Version = indexMeta.Version + 1
indexMeta.State = commonpb.IndexState_Finished
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0]+1,
proto.MarshalTextString(&indexMeta))
indexMeta2 := indexpb.IndexMeta{}
_, values2, versions2, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
_ = proto.UnmarshalText(values2[0], &indexMeta2)
indexMeta2.Version = indexMeta.Version + 1
indexMeta2.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta2.State = commonpb.IndexState_Finished
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0],
proto.MarshalTextString(&indexMeta2))
}
}
}
@ -91,6 +96,7 @@ func (inm *Mock) Start() error {
if inm.Err {
return errors.New("IndexNode start failed")
}
fmt.Println("haha")
inm.wg.Add(1)
go inm.buildIndexTask()
return nil