mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Make CollectionObserver trigger checker more frequently during load procedure (#23928)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a86332b2d7
commit
ed81eaa963
@ -70,7 +70,7 @@ func NewCheckerController(
|
|||||||
|
|
||||||
return &CheckerController{
|
return &CheckerController{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
checkCh: make(chan struct{}),
|
checkCh: make(chan struct{}, 1),
|
||||||
meta: meta,
|
meta: meta,
|
||||||
dist: dist,
|
dist: dist,
|
||||||
targetMgr: targetMgr,
|
targetMgr: targetMgr,
|
||||||
@ -112,7 +112,10 @@ func (controller *CheckerController) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (controller *CheckerController) Check() {
|
func (controller *CheckerController) Check() {
|
||||||
controller.checkCh <- struct{}{}
|
select {
|
||||||
|
case controller.checkCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check is the real implementation of Check
|
// check is the real implementation of Check
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||||
@ -37,6 +38,7 @@ type CollectionObserver struct {
|
|||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
targetMgr *meta.TargetManager
|
targetMgr *meta.TargetManager
|
||||||
targetObserver *TargetObserver
|
targetObserver *TargetObserver
|
||||||
|
checkerController *checkers.CheckerController
|
||||||
partitionLoadedCount map[int64]int
|
partitionLoadedCount map[int64]int
|
||||||
|
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
@ -47,6 +49,7 @@ func NewCollectionObserver(
|
|||||||
meta *meta.Meta,
|
meta *meta.Meta,
|
||||||
targetMgr *meta.TargetManager,
|
targetMgr *meta.TargetManager,
|
||||||
targetObserver *TargetObserver,
|
targetObserver *TargetObserver,
|
||||||
|
checherController *checkers.CheckerController,
|
||||||
) *CollectionObserver {
|
) *CollectionObserver {
|
||||||
return &CollectionObserver{
|
return &CollectionObserver{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
@ -54,6 +57,7 @@ func NewCollectionObserver(
|
|||||||
meta: meta,
|
meta: meta,
|
||||||
targetMgr: targetMgr,
|
targetMgr: targetMgr,
|
||||||
targetObserver: targetObserver,
|
targetObserver: targetObserver,
|
||||||
|
checkerController: checherController,
|
||||||
partitionLoadedCount: make(map[int64]int),
|
partitionLoadedCount: make(map[int64]int),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,12 +139,18 @@ func (ob *CollectionObserver) observeLoadStatus() {
|
|||||||
if len(partitions) > 0 {
|
if len(partitions) > 0 {
|
||||||
log.Info("observe partitions status", zap.Int("partitionNum", len(partitions)))
|
log.Info("observe partitions status", zap.Int("partitionNum", len(partitions)))
|
||||||
}
|
}
|
||||||
|
loading := false
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
if partition.LoadPercentage == 100 {
|
if partition.LoadPercentage == 100 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
|
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
|
||||||
ob.observePartitionLoadStatus(partition, replicaNum)
|
ob.observePartitionLoadStatus(partition, replicaNum)
|
||||||
|
loading = true
|
||||||
|
}
|
||||||
|
// trigger check logic when loading collections/partitions
|
||||||
|
if loading {
|
||||||
|
ob.checkerController.Check()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
@ -58,10 +59,11 @@ type CollectionObserverSuite struct {
|
|||||||
broker *meta.MockBroker
|
broker *meta.MockBroker
|
||||||
|
|
||||||
// Dependencies
|
// Dependencies
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
targetMgr *meta.TargetManager
|
targetMgr *meta.TargetManager
|
||||||
targetObserver *TargetObserver
|
targetObserver *TargetObserver
|
||||||
|
checkerController *checkers.CheckerController
|
||||||
|
|
||||||
// Test object
|
// Test object
|
||||||
ob *CollectionObserver
|
ob *CollectionObserver
|
||||||
@ -188,6 +190,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||||||
suite.dist,
|
suite.dist,
|
||||||
suite.broker,
|
suite.broker,
|
||||||
)
|
)
|
||||||
|
suite.checkerController = &checkers.CheckerController{}
|
||||||
|
|
||||||
// Test object
|
// Test object
|
||||||
suite.ob = NewCollectionObserver(
|
suite.ob = NewCollectionObserver(
|
||||||
@ -195,6 +198,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||||||
suite.meta,
|
suite.meta,
|
||||||
suite.targetMgr,
|
suite.targetMgr,
|
||||||
suite.targetObserver,
|
suite.targetObserver,
|
||||||
|
suite.checkerController,
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, collection := range suite.collections {
|
for _, collection := range suite.collections {
|
||||||
|
@ -356,6 +356,7 @@ func (s *Server) initObserver() {
|
|||||||
s.meta,
|
s.meta,
|
||||||
s.targetMgr,
|
s.targetMgr,
|
||||||
s.targetObserver,
|
s.targetObserver,
|
||||||
|
s.checkerController,
|
||||||
)
|
)
|
||||||
|
|
||||||
s.replicaObserver = observers.NewReplicaObserver(
|
s.replicaObserver = observers.NewReplicaObserver(
|
||||||
|
@ -518,6 +518,7 @@ func (suite *ServerSuite) hackServer() {
|
|||||||
suite.server.meta,
|
suite.server.meta,
|
||||||
suite.server.targetMgr,
|
suite.server.targetMgr,
|
||||||
suite.server.targetObserver,
|
suite.server.targetObserver,
|
||||||
|
suite.server.checkerController,
|
||||||
)
|
)
|
||||||
|
|
||||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe()
|
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe()
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
||||||
|
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
|
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
@ -188,6 +189,7 @@ func (suite *ServiceSuite) SetupTest() {
|
|||||||
suite.server.meta,
|
suite.server.meta,
|
||||||
suite.server.targetMgr,
|
suite.server.targetMgr,
|
||||||
suite.targetObserver,
|
suite.targetObserver,
|
||||||
|
&checkers.CheckerController{},
|
||||||
)
|
)
|
||||||
|
|
||||||
suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
|
suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||||
|
Loading…
Reference in New Issue
Block a user