Remove load cache (#23287)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2023-05-09 10:36:41 +08:00 committed by GitHub
parent 426ed30d2d
commit 3827ac30bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 22 additions and 331 deletions

View File

@ -94,7 +94,6 @@ type collectionInfo struct {
shardLeaders *shardLeaders
createdTimestamp uint64
createdUtcTimestamp uint64
isLoaded bool
}
func (info *collectionInfo) isCollectionCached() bool {
@ -294,40 +293,6 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string
metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
}
if !collInfo.isLoaded {
// check if collection was loaded
showResp, err := m.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionIDs: []int64{collInfo.collID},
})
if err != nil {
return nil, err
}
if showResp.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, errors.New(showResp.Status.Reason)
}
log.Debug("QueryCoord show collections",
zap.Int64("collID", collInfo.collID),
zap.Int64s("collections", showResp.GetCollectionIDs()),
zap.Int64s("collectionsInMemoryPercentages", showResp.GetInMemoryPercentages()),
)
loaded := false
for index, collID := range showResp.CollectionIDs {
if collID == collInfo.collID && showResp.GetInMemoryPercentages()[index] >= int64(100) {
loaded = true
break
}
}
if loaded {
m.mu.Lock()
m.collInfo[collectionName].isLoaded = true
m.mu.Unlock()
}
}
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
return collInfo, nil
}

View File

@ -685,60 +685,6 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
})
}
func TestMetaCache_LoadCache(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &types.MockQueryCoord{}
mgr := newShardClientMgr()
err := InitMetaCache(ctx, rootCoord, queryCoord, mgr)
assert.Nil(t, err)
qcCounter := 0
queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionIDs: []UniqueID{1, 2},
InMemoryPercentages: []int64{100, 50},
}, nil).Run(func(ctx context.Context, req *querypb.ShowCollectionsRequest) {
qcCounter++
})
t.Run("test IsCollectionLoaded", func(t *testing.T) {
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// no collectionInfo of collection1, should access RootCoord
assert.Equal(t, rootCoord.GetAccessCount(), 1)
// not loaded, should access QueryCoord
assert.Equal(t, qcCounter, 1)
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access QueryCoord or RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.Equal(t, qcCounter, 1)
// test collection2 not fully loaded
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection2")
assert.NoError(t, err)
assert.False(t, info.isLoaded)
// no collectionInfo of collection2, should access RootCoord
assert.Equal(t, rootCoord.GetAccessCount(), 2)
// not loaded, should access QueryCoord
assert.Equal(t, qcCounter, 2)
})
t.Run("test RemoveCollectionLoadCache", func(t *testing.T) {
globalMetaCache.RemoveCollection(ctx, "collection1")
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// should access QueryCoord
assert.Equal(t, qcCounter, 3)
})
}
func TestMetaCache_RemoveCollection(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
@ -755,31 +701,27 @@ func TestMetaCache_RemoveCollection(t *testing.T) {
InMemoryPercentages: []int64{100, 50},
}, nil)
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
_, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// no collectionInfo of collection1, should access RootCoord
assert.Equal(t, rootCoord.GetAccessCount(), 1)
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
_, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 1)
globalMetaCache.RemoveCollection(ctx, "collection1")
// no collectionInfo of collection2, should access RootCoord
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
_, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 2)
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
// no collectionInfo of collection2, should access RootCoord
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
_, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 3)
}

View File

@ -308,14 +308,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.queryParams = queryParams
t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset
loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.RetrieveRequest.GetPartitionIDs())
if err != nil {
return fmt.Errorf("checkIfLoaded failed when query, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
}
if !loaded {
return fmt.Errorf("collection:%v or partition:%v not loaded into memory when query", collectionName, t.request.GetPartitionNames())
}
schema, _ := globalMetaCache.GetCollectionSchema(ctx, collectionName)
t.schema = schema

View File

@ -111,12 +111,6 @@ func TestQueryTask_all(t *testing.T) {
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: &successStatus,
CollectionIDs: []int64{collectionID},
InMemoryPercentages: []int64{100},
}, nil)
status, err := qc.LoadCollection(ctx, &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,

View File

@ -234,15 +234,6 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
return err
}
// check if collection/partitions are loaded into query node
loaded, err := checkIfLoaded(ctx, t.qc, collectionName, t.SearchRequest.GetPartitionIDs())
if err != nil {
return fmt.Errorf("checkIfLoaded failed when search, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
}
if !loaded {
return fmt.Errorf("collection:%v or partition:%v not loaded into memory when search", collectionName, t.request.GetPartitionNames())
}
t.request.OutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false)
if err != nil {
return err
@ -639,43 +630,6 @@ func (t *searchTask) collectSearchResults(ctx context.Context) error {
return nil
}
// checkIfLoaded check if collection was loaded into QueryNode
func checkIfLoaded(ctx context.Context, qc types.QueryCoord, collectionName string, searchPartitionIDs []UniqueID) (bool, error) {
info, err := globalMetaCache.GetCollectionInfo(ctx, collectionName)
if err != nil {
return false, fmt.Errorf("GetCollectionInfo failed, collection = %s, err = %s", collectionName, err)
}
if info.isLoaded {
return true, nil
}
if len(searchPartitionIDs) == 0 {
return false, nil
}
// If request to search partitions
resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: info.collID,
PartitionIDs: searchPartitionIDs,
})
if err != nil {
return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, err = %s", collectionName, searchPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, fmt.Errorf("showPartitions failed, collection = %s, partitionIDs = %v, reason = %s", collectionName, searchPartitionIDs, resp.GetStatus().GetReason())
}
for _, persent := range resp.InMemoryPercentages {
if persent < 100 {
return false, nil
}
}
return true, nil
}
func decodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
tr := timerecord.NewTimeRecorder("decodeSearchResults")
results := make([]*schemapb.SearchResultData, 0)

View File

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -128,7 +127,6 @@ func TestSearchTask_PreExecute(t *testing.T) {
qc = types.NewMockQueryCoord(t)
ctx = context.TODO()
)
successStatus := commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
err = rc.Start()
defer rc.Stop()
@ -169,30 +167,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
return task
}
mockShowCollectionSuccess := func() *mock.Call {
return qc.On("ShowCollections", mock.Anything, mock.Anything).Return(
func(ctx context.Context, req *querypb.ShowCollectionsRequest) *querypb.ShowCollectionsResponse {
return &querypb.ShowCollectionsResponse{
Status: &successStatus,
CollectionIDs: req.CollectionIDs,
InMemoryPercentages: []int64{100},
}
}, nil)
}
mockShowCollectionFail := func() *mock.Call {
return qc.On("ShowCollections", mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mock",
},
}, nil)
}
t.Run("bad nq 0", func(t *testing.T) {
call := mockShowCollectionSuccess()
defer call.Unset()
collName := "test_bad_nq0_error" + funcutil.GenRandomStr()
createColl(t, collName, rc)
// Nq must be in range [1, 16384].
@ -202,8 +177,6 @@ func TestSearchTask_PreExecute(t *testing.T) {
})
t.Run("bad nq 16385", func(t *testing.T) {
call := mockShowCollectionSuccess()
defer call.Unset()
collName := "test_bad_nq16385_error" + funcutil.GenRandomStr()
createColl(t, collName, rc)
@ -221,8 +194,6 @@ func TestSearchTask_PreExecute(t *testing.T) {
})
t.Run("invalid IgnoreGrowing param", func(t *testing.T) {
call := mockShowCollectionSuccess()
defer call.Unset()
collName := "test_invalid_param" + funcutil.GenRandomStr()
createColl(t, collName, rc)
@ -232,21 +203,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
assert.Error(t, err)
})
t.Run("test checkIfLoaded error", func(t *testing.T) {
collName := "test_checkIfLoaded_error" + funcutil.GenRandomStr()
createColl(t, collName, rc)
task := getSearchTask(t, collName)
t.Run("show collection status unexpected error", func(t *testing.T) {
call := mockShowCollectionFail()
defer call.Unset()
assert.Error(t, task.PreExecute(ctx))
})
})
t.Run("search with timeout", func(t *testing.T) {
call := mockShowCollectionSuccess()
defer call.Unset()
collName := "search_with_timeout" + funcutil.GenRandomStr()
createColl(t, collName, rc)
@ -1551,135 +1508,6 @@ func TestTaskSearch_reduceSearchResultData(t *testing.T) {
})
}
func Test_checkIfLoaded(t *testing.T) {
t.Run("failed to get collection info", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return nil, errors.New("mock")
})
globalMetaCache = cache
var qc types.QueryCoord
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
assert.Error(t, err)
})
t.Run("collection loaded", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: true}, nil
})
globalMetaCache = cache
var qc types.QueryCoord
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
assert.NoError(t, err)
assert.True(t, loaded)
})
t.Run("show partitions failed", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Times(1)
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.Error(t, err)
})
t.Run("show partitions but didn't success", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil).Times(1)
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.Error(t, err)
})
t.Run("partitions loaded", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(
&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 100}}, nil).Times(1)
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.NoError(t, err)
assert.True(t, loaded)
})
t.Run("partitions loaded, some patitions not fully loaded", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(
&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, InMemoryPercentages: []int64{100, 50}}, nil).Times(1)
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.NoError(t, err)
assert.False(t, loaded)
})
t.Run("no specified partitions, show partitions failed", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Times(1)
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.Error(t, err)
})
t.Run("no specified partitions, show partitions but didn't succeed", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}}, nil).Times(1)
_, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{1, 2})
assert.Error(t, err)
})
t.Run("not fully loaded", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(
&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{1, 2}}, nil).Times(1)
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
assert.NoError(t, err)
assert.False(t, loaded)
})
t.Run("not loaded", func(t *testing.T) {
cache := newMockCache()
cache.setGetInfoFunc(func(ctx context.Context, collectionName string) (*collectionInfo, error) {
return &collectionInfo{isLoaded: false}, nil
})
globalMetaCache = cache
qc := getQueryCoord()
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(
&querypb.ShowPartitionsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, PartitionIDs: []UniqueID{}}, nil).Times(1)
loaded, err := checkIfLoaded(context.Background(), qc, "test", []UniqueID{})
assert.NoError(t, err)
assert.False(t, loaded)
})
}
func TestSearchTask_ErrExecute(t *testing.T) {
var (

View File

@ -839,7 +839,14 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
Status: merr.Status(nil),
}
if s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) < 100 {
percentage := s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID())
if percentage < 0 {
err := merr.WrapErrCollectionNotLoaded(req.GetCollectionID())
log.Warn("failed to GetShardLeaders", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
if percentage < 100 {
msg := fmt.Sprintf("collection %v is not fully loaded", req.GetCollectionID())
log.Warn(msg)
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg)

View File

@ -1516,6 +1516,15 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode())
}
// collection not loaded
req := &querypb.GetShardLeadersRequest{
CollectionID: -1,
}
resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.True(errors.Is(merr.Error(resp.GetStatus()), merr.ErrCollectionNotLoaded))
}
func (suite *ServiceSuite) TestHandleNodeUp() {

View File

@ -894,7 +894,7 @@ class TestPartitionOperations(TestcaseBase):
params={"nprobe": 32}, limit=1,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0,
ct.err_msg: "not been loaded"})
ct.err_msg: "not loaded"})
# release partition
partition_w.release()