diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 7bc85edff0..ce95213fa9 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -65,7 +66,10 @@ type queryCollection struct { queryMsgStream msgstream.MsgStream queryResultMsgStream msgstream.MsgStream - vcm storage.ChunkManager + localChunkManager storage.ChunkManager + remoteChunkManager storage.ChunkManager + vectorChunkManager storage.ChunkManager + localCacheEnabled bool } type ResultEntityIds []UniqueID @@ -76,7 +80,9 @@ func newQueryCollection(releaseCtx context.Context, historical *historical, streaming *streaming, factory msgstream.Factory, - vcm storage.ChunkManager, + localChunkManager storage.ChunkManager, + remoteChunkManager storage.ChunkManager, + localCacheEnabled bool, ) *queryCollection { unsolvedMsg := make([]queryMsg, 0) @@ -102,7 +108,9 @@ func newQueryCollection(releaseCtx context.Context, queryMsgStream: queryStream, queryResultMsgStream: queryResultStream, - vcm: vcm, + localChunkManager: localChunkManager, + remoteChunkManager: remoteChunkManager, + localCacheEnabled: localCacheEnabled, } qc.register() @@ -1059,8 +1067,21 @@ func (q *queryCollection) retrieve(msg queryMsg) error { var mergeList []*segcorepb.RetrieveResults + if q.vectorChunkManager == nil { + if q.localChunkManager == nil { + return fmt.Errorf("can not create vector chunk manager for local chunk manager is nil") + } + if q.remoteChunkManager == nil { + return fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil") + } + q.vectorChunkManager = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager, + &etcdpb.CollectionMeta{ + ID: collection.id, + Schema: collection.schema, + }, q.localCacheEnabled) + } // historical retrieve - hisRetrieveResults, sealedSegmentRetrieved, err1 := q.historical.retrieve(collectionID, retrieveMsg.PartitionIDs, q.vcm, plan) + hisRetrieveResults, sealedSegmentRetrieved, err1 := q.historical.retrieve(collectionID, retrieveMsg.PartitionIDs, q.vectorChunkManager, plan) if err1 != nil { log.Warn(err1.Error()) return err1 diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index f8ce608eb6..699aaac6a7 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -62,7 +62,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { assert.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) - queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil) + queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil, nil, false) producerChannels := []string{"testResultChannel"} queryCollection.queryResultMsgStream.AsProducer(producerChannels) diff --git a/internal/querynode/query_service.go b/internal/querynode/query_service.go index 7e1d1e7c5d..0112aa7dae 100644 --- a/internal/querynode/query_service.go +++ b/internal/querynode/query_service.go @@ -21,7 +21,6 @@ import ( miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" ) @@ -36,9 +35,9 @@ type queryService struct { factory msgstream.Factory - lcm storage.ChunkManager - rcm storage.ChunkManager - localCacheEnabled bool + localChunkManager storage.ChunkManager + remoteChunkManager storage.ChunkManager + localCacheEnabled bool } func newQueryService(ctx context.Context, @@ -56,7 +55,7 @@ func newQueryService(ctx context.Context, enabled, _ := Params.Load("localStorage.enabled") localCacheEnabled, _ := strconv.ParseBool(enabled) - lcm := storage.NewLocalChunkManager(path) + localChunkManager := storage.NewLocalChunkManager(path) option := &miniokv.Option{ Address: Params.MinioEndPoint, @@ -71,7 +70,7 @@ func newQueryService(ctx context.Context, if err != nil { panic(err) } - rcm := storage.NewMinioChunkManager(client) + remoteChunkManager := storage.NewMinioChunkManager(client) return &queryService{ ctx: queryServiceCtx, @@ -84,9 +83,9 @@ func newQueryService(ctx context.Context, factory: factory, - lcm: lcm, - rcm: rcm, - localCacheEnabled: localCacheEnabled, + localChunkManager: localChunkManager, + remoteChunkManager: remoteChunkManager, + localCacheEnabled: localCacheEnabled, } } @@ -104,14 +103,6 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) { log.Warn("query collection already exists", zap.Any("collectionID", collectionID)) return } - collection, _ := q.historical.replica.getCollectionByID(collectionID) - - vcm := storage.NewVectorChunkManager(q.lcm, q.rcm, - &etcdpb.CollectionMeta{ - ID: collection.id, - Schema: collection.schema, - }, q.localCacheEnabled) - ctx1, cancel := context.WithCancel(q.ctx) qc := newQueryCollection(ctx1, cancel, @@ -119,7 +110,9 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) { q.historical, q.streaming, q.factory, - vcm, + q.localChunkManager, + q.remoteChunkManager, + q.localCacheEnabled, ) q.queryCollections[collectionID] = qc }