Improve rootcoord codecov (#7518)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-09-07 11:16:37 +08:00 committed by GitHub
parent 877e2d7ba9
commit d15d3fa0ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 138 deletions

View File

@ -313,6 +313,29 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
t.Run("update channel timetick", func(t *testing.T) {
req := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
}
status, err := svr.UpdateChannelTimeTick(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
})
t.Run("release DQL msg stream", func(t *testing.T) {
req := &proxypb.ReleaseDQLMessageStreamRequest{}
assert.Panics(t, func() { svr.ReleaseDQLMessageStream(ctx, req) })
})
t.Run("get metrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
rsp, err := svr.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
t.Run("create collection", func(t *testing.T) {
schema := schemapb.CollectionSchema{
Name: collName,

View File

@ -29,17 +29,14 @@ type ParamTable struct {
Address string
Port int
PulsarAddress string
RocksmqPath string
RocksmqRetentionSizeInMinutes int64
RocksmqRetentionSizeInMB int64
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
DmlChannelName string
PulsarAddress string
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
MsgChannelSubName string
TimeTickChannel string
StatisticsChannel string
DmlChannelName string
DmlChannelNum int64
MaxPartitionNum int64
@ -65,7 +62,6 @@ func (p *ParamTable) Init() {
}
p.initPulsarAddress()
p.initRocksmqPath()
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
@ -97,18 +93,6 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr
}
func (p *ParamTable) initRocksmqPath() {
path, err := p.Load("_RocksmqPath")
if err != nil {
panic(err)
}
p.RocksmqPath = path
}
func (p *ParamTable) initRocksmqRetentionTimeInMinutes() {
p.RocksmqRetentionSizeInMinutes = p.ParseInt64("rootcoord.RocksmqRetentionSizeInMinutes")
}
func (p *ParamTable) initEtcdEndpoints() {
endpoints, err := p.Load("_EtcdEndpoints")
if err != nil {

View File

@ -585,16 +585,13 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
c.CallGetBinlogFilePathsService = func(ctx context.Context, segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) {
defer func() {
if err := recover(); err != nil {
retFiles = nil
retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err)
}
}()
<-initCh //wait connect to data coord
ts, err := c.TSOAllocator(1)
if err != nil {
retFiles = nil
retErr = err
return
return nil, err
}
binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{
Base: &commonpb.MsgBase{
@ -606,41 +603,29 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
SegmentID: segID,
})
if err != nil {
retFiles = nil
retErr = err
return
return nil, err
}
if binlog.Status.ErrorCode != commonpb.ErrorCode_Success {
retFiles = nil
retErr = fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
return
return nil, fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
}
for i := range binlog.FieldIDs {
if binlog.FieldIDs[i] == fieldID {
retFiles = binlog.Paths[i].Values
retErr = nil
return
return binlog.Paths[i].Values, nil
}
}
retFiles = nil
retErr = fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
return
return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
}
c.CallGetNumRowsService = func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (retRows int64, retErr error) {
defer func() {
if err := recover(); err != nil {
retRows = 0
retErr = fmt.Errorf("get num rows panic, msg = %v", err)
return
}
}()
<-initCh
ts, err := c.TSOAllocator(1)
if err != nil {
retRows = 0
retErr = err
return
return retRows, err
}
segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
@ -652,36 +637,26 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
SegmentIDs: []typeutil.UniqueID{segID},
})
if err != nil {
retRows = 0
retErr = err
return
return retRows, err
}
if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success {
return 0, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason)
return retRows, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason)
}
if len(segInfo.Infos) != 1 {
log.Debug("get segment info empty")
retRows = 0
retErr = nil
return
return retRows, nil
}
if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed {
log.Debug("segment id not flushed", zap.Int64("segment id", segID))
retRows = 0
retErr = nil
return
return retRows, nil
}
retRows = segInfo.Infos[0].NumOfRows
retErr = nil
return
return segInfo.Infos[0].NumOfRows, nil
}
c.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) (retSegIDs []typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {
retSegIDs = []typeutil.UniqueID{}
retErr = fmt.Errorf("get flushed segments from data coord panic, msg = %v", err)
return
}
}()
<-initCh
@ -697,18 +672,12 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
}
rsp, err := s.GetFlushedSegments(ctx, req)
if err != nil {
retSegIDs = []typeutil.UniqueID{}
retErr = err
return
return retSegIDs, err
}
if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
retSegIDs = []typeutil.UniqueID{}
retErr = fmt.Errorf("get flushed segments from data coord failed, reason = %s", rsp.Status.Reason)
return
return retSegIDs, fmt.Errorf("get flushed segments from data coord failed, reason = %s", rsp.Status.Reason)
}
retSegIDs = rsp.Segments
retErr = nil
return
return rsp.Segments, nil
}
return nil
@ -732,9 +701,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {
retID = 0
retErr = fmt.Errorf("build index panic, msg = %v", err)
return
}
}()
<-initCh
@ -746,25 +713,18 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
IndexName: idxInfo.IndexName,
})
if err != nil {
retID = 0
retErr = err
return
return retID, err
}
if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
retID = 0
retErr = fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason)
return
return retID, fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason)
}
retID = rsp.IndexBuildID
retErr = nil
return
return rsp.IndexBuildID, nil
}
c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) (retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("drop index from index service panic, msg = %v", err)
return
}
}()
<-initCh
@ -772,15 +732,12 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
IndexID: indexID,
})
if err != nil {
retErr = err
return
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_Success {
retErr = fmt.Errorf(rsp.Reason)
return
return fmt.Errorf(rsp.Reason)
}
retErr = nil
return
return nil
}
return nil
@ -804,7 +761,6 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("release collection from query service panic, msg = %v", err)
return
}
}()
<-initCh
@ -820,15 +776,12 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
}
rsp, err := s.ReleaseCollection(ctx, req)
if err != nil {
retErr = err
return
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_Success {
retErr = fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
return
return fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
}
retErr = nil
return
return nil
}
c.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) (retErr error) {
defer func() {
@ -850,15 +803,12 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
}
rsp, err := s.ReleasePartitions(ctx, req)
if err != nil {
retErr = err
return
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_Success {
retErr = fmt.Errorf("ReleasePartitions from query service failed, error = %s", rsp.Reason)
return
return fmt.Errorf("ReleasePartitions from query service failed, error = %s", rsp.Reason)
}
retErr = nil
return
return nil
}
return nil
}
@ -1012,11 +962,13 @@ func (c *Core) Init() error {
return initError
}
func (c *Core) reSendDdMsg(ctx context.Context) error {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
if err != nil || flag == "true" {
log.Debug("No un-successful DdMsg")
return nil
func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
if !force {
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
if err != nil || flag == "true" {
log.Debug("No un-successful DdMsg")
return nil
}
}
ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix, 0)
@ -1029,6 +981,10 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
return err
}
var invalidateCache bool
var ts typeutil.Timestamp
var dbName, collName string
switch ddOp.Type {
case CreateCollectionDDType:
var ddReq = internalpb.CreateCollectionRequest{}
@ -1042,11 +998,14 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
if err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
invalidateCache = false
case DropCollectionDDType:
var ddReq = internalpb.DropCollectionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp
dbName, collName = ddReq.DbName, ddReq.CollectionName
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
if err != nil {
return err
@ -1054,66 +1013,59 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ddReq.Base.Timestamp,
SourceID: c.session.ServerID,
},
DbName: ddReq.DbName,
CollectionName: ddReq.CollectionName,
}
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
invalidateCache = true
case CreatePartitionDDType:
var ddReq = internalpb.CreatePartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp
dbName, collName = ddReq.DbName, ddReq.CollectionName
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
if err != nil {
return err
}
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil {
return fmt.Errorf("partition %s already created", ddReq.PartitionName)
}
if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ddReq.Base.Timestamp,
SourceID: c.session.ServerID,
},
DbName: ddReq.DbName,
CollectionName: ddReq.CollectionName,
}
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
invalidateCache = true
case DropPartitionDDType:
var ddReq = internalpb.DropPartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp
dbName, collName = ddReq.DbName, ddReq.CollectionName
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
if err != nil {
return err
}
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil {
return err
}
if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
return err
}
invalidateCache = true
default:
return fmt.Errorf("Invalid DdOperation %s", ddOp.Type)
}
if invalidateCache {
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ddReq.Base.Timestamp,
Timestamp: ts,
SourceID: c.session.ServerID,
},
DbName: ddReq.DbName,
CollectionName: ddReq.CollectionName,
DbName: dbName,
CollectionName: collName,
}
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
default:
return fmt.Errorf("Invalid DdOperation %s", ddOp.Type)
}
// Update DDOperation in etcd
@ -1134,7 +1086,7 @@ func (c *Core) Start() error {
log.Debug("RootCoord Start WatchProxy failed", zap.Error(err))
return
}
if err := c.reSendDdMsg(c.ctx); err != nil {
if err := c.reSendDdMsg(c.ctx, false); err != nil {
log.Debug("RootCoord Start reSendDdMsg failed", zap.Error(err))
return
}

View File

@ -68,6 +68,13 @@ func (p *proxyMock) GetCollArray() []string {
return ret
}
func (p *proxyMock) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}
type dataMock struct {
types.DataCoord
randVal int
@ -272,6 +279,7 @@ func TestRootCoord(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.DmlChannelName = fmt.Sprintf("dml-%d", randVal)
err = core.Register()
assert.Nil(t, err)
@ -418,7 +426,6 @@ func TestRootCoord(t *testing.T) {
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
dmlStream.Start()
// get CreateCollectionMsg
// get CreateCollectionMsg
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())
assert.Equal(t, 1, len(msgs))
@ -502,6 +509,9 @@ func TestRootCoord(t *testing.T) {
status, err = core.CreateCollection(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
err = core.reSendDdMsg(core.ctx, true)
assert.Nil(t, err)
})
t.Run("has collection", func(t *testing.T) {
@ -641,6 +651,9 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID)
err = core.reSendDdMsg(core.ctx, true)
assert.NotNil(t, err)
})
t.Run("has partition", func(t *testing.T) {
@ -973,6 +986,25 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, dropPartID, ddReq.PartitionID)
err = core.reSendDdMsg(core.ctx, true)
assert.NotNil(t, err)
})
t.Run("remove DQL msgstream", func(t *testing.T) {
collMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
req := &proxypb.ReleaseDQLMessageStreamRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RemoveQueryChannels,
SourceID: core.session.ServerID,
},
CollectionID: collMeta.ID,
}
status, err := core.ReleaseDQLMessageStream(core.ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
})
t.Run("drop collection", func(t *testing.T) {
@ -1043,6 +1075,9 @@ func TestRootCoord(t *testing.T) {
err = proto.UnmarshalText(ddOp.Body, &ddReq)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
err = core.reSendDdMsg(core.ctx, true)
assert.NotNil(t, err)
})
t.Run("context_cancel", func(t *testing.T) {
@ -1417,7 +1452,6 @@ func TestRootCoord(t *testing.T) {
p1 := sessionutil.Session{
ServerID: 100,
}
p2 := sessionutil.Session{
ServerID: 101,
}
@ -1428,9 +1462,11 @@ func TestRootCoord(t *testing.T) {
s2, err := json.Marshal(&p2)
assert.Nil(t, err)
_, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyRole)+"-1", string(s1))
proxy1 := path.Join(sessKey, typeutil.ProxyRole) + "-1"
proxy2 := path.Join(sessKey, typeutil.ProxyRole) + "-2"
_, err = core.etcdCli.Put(ctx2, proxy1, string(s1))
assert.Nil(t, err)
_, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyRole)+"-2", string(s2))
_, err = core.etcdCli.Put(ctx2, proxy2, string(s2))
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
@ -1481,6 +1517,11 @@ func TestRootCoord(t *testing.T) {
// add 3 proxy channels
assert.Equal(t, 3, core.chanTimeTick.GetChanNum()-numChan)
_, err = core.etcdCli.Delete(ctx2, proxy1)
assert.Nil(t, err)
_, err = core.etcdCli.Delete(ctx2, proxy2)
assert.Nil(t, err)
})
t.Run("get metrics", func(t *testing.T) {