mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 04:49:08 +08:00
Add proxy metacache
Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
c2914dd113
commit
abcc565311
@ -47,6 +47,7 @@ func (c *GrpcClient) Init() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.grpcClient = masterpb.NewMasterServiceClient(c.conn)
|
c.grpcClient = masterpb.NewMasterServiceClient(c.conn)
|
||||||
|
cms.Params.Init()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BIN
internal/proxynode/debug.test
Executable file
BIN
internal/proxynode/debug.test
Executable file
Binary file not shown.
@ -26,7 +26,7 @@ func (node *NodeImpl) UpdateStateCode(code internalpb2.StateCode) {
|
|||||||
|
|
||||||
func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||||
collectionName := request.CollectionName
|
collectionName := request.CollectionName
|
||||||
_ = globalMetaCache.Remove(collectionName) // no need to return error, though collection may be not cached
|
globalMetaCache.RemoveCollection(collectionName) // no need to return error, though collection may be not cached
|
||||||
return &commonpb.Status{
|
return &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||||
Reason: "",
|
Reason: "",
|
||||||
|
@ -1,100 +1,213 @@
|
|||||||
package proxynode
|
package proxynode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MasterClientInterface interface {
|
||||||
|
DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
|
||||||
|
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
type Cache interface {
|
type Cache interface {
|
||||||
Hit(collectionName string) bool
|
GetCollectionID(collectionName string) (typeutil.UniqueID, error)
|
||||||
Get(collectionName string) (*milvuspb.DescribeCollectionResponse, error)
|
GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error)
|
||||||
Sync(collectionName string) error
|
GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error)
|
||||||
Update(collectionName string, desc *milvuspb.DescribeCollectionResponse) error
|
RemoveCollection(collectionName string)
|
||||||
Remove(collectionName string) error
|
RemovePartition(partitionName string)
|
||||||
|
}
|
||||||
|
|
||||||
|
type collectionInfo struct {
|
||||||
|
collID typeutil.UniqueID
|
||||||
|
schema *schemapb.CollectionSchema
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetaCache struct {
|
||||||
|
client MasterClientInterface
|
||||||
|
|
||||||
|
collInfo map[string]*collectionInfo
|
||||||
|
partInfo map[string]typeutil.UniqueID
|
||||||
|
col2par map[string][]string
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var globalMetaCache Cache
|
var globalMetaCache Cache
|
||||||
|
|
||||||
type SimpleMetaCache struct {
|
func InitMetaCache(client MasterClientInterface) error {
|
||||||
mu sync.RWMutex
|
var err error
|
||||||
metas map[string]*milvuspb.DescribeCollectionResponse // collection name to schema
|
globalMetaCache, err = NewMetaCache(client)
|
||||||
ctx context.Context
|
if err != nil {
|
||||||
proxyInstance *NodeImpl
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (metaCache *SimpleMetaCache) Hit(collectionName string) bool {
|
func NewMetaCache(client MasterClientInterface) (*MetaCache, error) {
|
||||||
metaCache.mu.RLock()
|
return &MetaCache{
|
||||||
defer metaCache.mu.RUnlock()
|
client: client,
|
||||||
_, ok := metaCache.metas[collectionName]
|
collInfo: map[string]*collectionInfo{},
|
||||||
return ok
|
partInfo: map[string]typeutil.UniqueID{},
|
||||||
|
col2par: map[string][]string{},
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (metaCache *SimpleMetaCache) Get(collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
|
func (m *MetaCache) readCollectionID(collectionName string) (typeutil.UniqueID, error) {
|
||||||
metaCache.mu.RLock()
|
m.mu.RLock()
|
||||||
defer metaCache.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
schema, ok := metaCache.metas[collectionName]
|
|
||||||
|
collInfo, ok := m.collInfo[collectionName]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("collection meta miss")
|
return 0, errors.Errorf("can't find collection name:%s", collectionName)
|
||||||
}
|
}
|
||||||
return schema, nil
|
return collInfo.collID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (metaCache *SimpleMetaCache) Sync(collectionName string) error {
|
func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) {
|
||||||
dct := &DescribeCollectionTask{
|
m.mu.RLock()
|
||||||
Condition: NewTaskCondition(metaCache.ctx),
|
defer m.mu.RUnlock()
|
||||||
DescribeCollectionRequest: &milvuspb.DescribeCollectionRequest{
|
|
||||||
|
collInfo, ok := m.collInfo[collectionName]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("can't find collection name:%s", collectionName)
|
||||||
|
}
|
||||||
|
return collInfo.schema, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
partitionID, ok := m.partInfo[partitionName]
|
||||||
|
if !ok {
|
||||||
|
return 0, errors.Errorf("can't find partition name:%s", partitionName)
|
||||||
|
}
|
||||||
|
return partitionID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, error) {
|
||||||
|
collID, err := m.readCollectionID(collectionName)
|
||||||
|
if err == nil {
|
||||||
|
return collID, nil
|
||||||
|
}
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
req := &milvuspb.DescribeCollectionRequest{
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
MsgType: commonpb.MsgType_kDescribeCollection,
|
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||||
},
|
},
|
||||||
CollectionName: collectionName,
|
CollectionName: collectionName,
|
||||||
},
|
|
||||||
masterClient: metaCache.proxyInstance.masterClient,
|
|
||||||
}
|
}
|
||||||
var cancel func()
|
coll, err := m.client.DescribeCollection(req)
|
||||||
dct.ctx, cancel = context.WithTimeout(metaCache.ctx, reqTimeoutInterval)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
err := metaCache.proxyInstance.sched.DdQueue.Enqueue(dct)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return 0, err
|
||||||
|
}
|
||||||
|
if coll.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||||
|
return 0, errors.Errorf("%s", coll.Status.Reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
return dct.WaitToFinish()
|
collInfo := &collectionInfo{
|
||||||
|
collID: coll.CollectionID,
|
||||||
|
schema: coll.Schema,
|
||||||
}
|
}
|
||||||
|
_, ok := m.collInfo[collectionName]
|
||||||
func (metaCache *SimpleMetaCache) Update(collectionName string, desc *milvuspb.DescribeCollectionResponse) error {
|
|
||||||
metaCache.mu.Lock()
|
|
||||||
defer metaCache.mu.Unlock()
|
|
||||||
|
|
||||||
metaCache.metas[collectionName] = desc
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (metaCache *SimpleMetaCache) Remove(collectionName string) error {
|
|
||||||
metaCache.mu.Lock()
|
|
||||||
defer metaCache.mu.Unlock()
|
|
||||||
|
|
||||||
_, ok := metaCache.metas[collectionName]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("cannot find collection: " + collectionName)
|
m.collInfo[collectionName] = collInfo
|
||||||
}
|
}
|
||||||
delete(metaCache.metas, collectionName)
|
return collInfo.collID, nil
|
||||||
|
}
|
||||||
|
func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) {
|
||||||
|
collSchema, err := m.readCollectionSchema(collectionName)
|
||||||
|
if err == nil {
|
||||||
|
return collSchema, nil
|
||||||
|
}
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
req := &milvuspb.DescribeCollectionRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||||
|
},
|
||||||
|
CollectionName: collectionName,
|
||||||
|
}
|
||||||
|
coll, err := m.client.DescribeCollection(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if coll.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||||
|
return nil, errors.Errorf("%s", coll.Status.Reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSimpleMetaCache(ctx context.Context, proxyInstance *NodeImpl) *SimpleMetaCache {
|
collInfo := &collectionInfo{
|
||||||
return &SimpleMetaCache{
|
collID: coll.CollectionID,
|
||||||
metas: make(map[string]*milvuspb.DescribeCollectionResponse),
|
schema: coll.Schema,
|
||||||
proxyInstance: proxyInstance,
|
|
||||||
ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
_, ok := m.collInfo[collectionName]
|
||||||
|
if !ok {
|
||||||
|
m.collInfo[collectionName] = collInfo
|
||||||
|
}
|
||||||
|
return collInfo.schema, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func initGlobalMetaCache(ctx context.Context, proxyInstance *NodeImpl) {
|
func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
|
||||||
globalMetaCache = newSimpleMetaCache(ctx, proxyInstance)
|
partitionID, err := m.readPartitionID(partitionName)
|
||||||
|
if err == nil {
|
||||||
|
return partitionID, nil
|
||||||
|
}
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
req := &milvuspb.ShowPartitionRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_kShowPartitions,
|
||||||
|
},
|
||||||
|
CollectionName: collectionName,
|
||||||
|
}
|
||||||
|
partitions, err := m.client.ShowPartitions(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if partitions.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||||
|
return 0, errors.Errorf("%s", partitions.Status.Reason)
|
||||||
|
}
|
||||||
|
if len(partitions.PartitionIDs) != len(partitions.PartitionNames) {
|
||||||
|
return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d",
|
||||||
|
len(partitions.PartitionIDs), len(partitions.PartitionNames))
|
||||||
|
}
|
||||||
|
m.col2par[collectionName] = partitions.PartitionNames
|
||||||
|
|
||||||
|
for i := 0; i < len(partitions.PartitionIDs); i++ {
|
||||||
|
_, ok := m.partInfo[partitions.PartitionNames[i]]
|
||||||
|
if !ok {
|
||||||
|
m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, ok := m.partInfo[partitionName]
|
||||||
|
if !ok {
|
||||||
|
return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName)
|
||||||
|
}
|
||||||
|
return m.partInfo[partitionName], nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaCache) RemoveCollection(collectionName string) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
delete(m.collInfo, collectionName)
|
||||||
|
for _, partitionName := range m.col2par[collectionName] {
|
||||||
|
delete(m.partInfo, partitionName)
|
||||||
|
}
|
||||||
|
delete(m.col2par, collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaCache) RemovePartition(partitionName string) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
delete(m.partInfo, partitionName)
|
||||||
}
|
}
|
||||||
|
97
internal/proxynode/meta_cache_test.go
Normal file
97
internal/proxynode/meta_cache_test.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
package proxynode
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MockMasterClientInterface struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockMasterClientInterface) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
|
||||||
|
if in.CollectionName == "collection1" {
|
||||||
|
return &milvuspb.ShowPartitionResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||||
|
},
|
||||||
|
PartitionIDs: []typeutil.UniqueID{1, 2},
|
||||||
|
PartitionNames: []string{"par1", "par2"},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return &milvuspb.ShowPartitionResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||||
|
},
|
||||||
|
PartitionIDs: []typeutil.UniqueID{},
|
||||||
|
PartitionNames: []string{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockMasterClientInterface) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||||
|
if in.CollectionName == "collection1" {
|
||||||
|
return &milvuspb.DescribeCollectionResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||||
|
},
|
||||||
|
CollectionID: typeutil.UniqueID(1),
|
||||||
|
Schema: &schemapb.CollectionSchema{
|
||||||
|
AutoID: true,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return &milvuspb.DescribeCollectionResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||||
|
},
|
||||||
|
CollectionID: typeutil.UniqueID(0),
|
||||||
|
Schema: nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetaCache_GetCollection(t *testing.T) {
|
||||||
|
client := &MockMasterClientInterface{}
|
||||||
|
err := InitMetaCache(client)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
id, err := globalMetaCache.GetCollectionID("collection1")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(1))
|
||||||
|
schema, err := globalMetaCache.GetCollectionSchema("collection1")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, schema, &schemapb.CollectionSchema{
|
||||||
|
AutoID: true,
|
||||||
|
})
|
||||||
|
id, err = globalMetaCache.GetCollectionID("collection2")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||||
|
schema, err = globalMetaCache.GetCollectionSchema("collection2")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Nil(t, schema)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetaCache_GetPartitionID(t *testing.T) {
|
||||||
|
client := &MockMasterClientInterface{}
|
||||||
|
err := InitMetaCache(client)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
id, err := globalMetaCache.GetPartitionID("collection1", "par1")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(1))
|
||||||
|
id, err = globalMetaCache.GetPartitionID("collection1", "par2")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(2))
|
||||||
|
id, err = globalMetaCache.GetPartitionID("collection1", "par3")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||||
|
id, err = globalMetaCache.GetPartitionID("collection2", "par3")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||||
|
id, err = globalMetaCache.GetPartitionID("collection2", "par4")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, id, typeutil.UniqueID(0))
|
||||||
|
}
|
@ -244,12 +244,15 @@ func (node *NodeImpl) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (node *NodeImpl) Start() error {
|
func (node *NodeImpl) Start() error {
|
||||||
|
err := InitMetaCache(node.masterClient)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Println("init global meta cache ...")
|
||||||
|
|
||||||
initGlobalInsertChannelsMap(node)
|
initGlobalInsertChannelsMap(node)
|
||||||
log.Println("init global insert channels map ...")
|
log.Println("init global insert channels map ...")
|
||||||
|
|
||||||
initGlobalMetaCache(node.ctx, node)
|
|
||||||
log.Println("init global meta cache ...")
|
|
||||||
|
|
||||||
node.manipulationMsgStream.Start()
|
node.manipulationMsgStream.Start()
|
||||||
log.Println("start manipulation message stream ...")
|
log.Println("start manipulation message stream ...")
|
||||||
|
|
||||||
|
@ -116,21 +116,21 @@ func (it *InsertTask) Execute() error {
|
|||||||
span.SetTag("start time", it.BeginTs())
|
span.SetTag("start time", it.BeginTs())
|
||||||
collectionName := it.BaseInsertTask.CollectionName
|
collectionName := it.BaseInsertTask.CollectionName
|
||||||
span.LogFields(oplog.String("collection_name", collectionName))
|
span.LogFields(oplog.String("collection_name", collectionName))
|
||||||
if !globalMetaCache.Hit(collectionName) {
|
collSchema, err := globalMetaCache.GetCollectionSchema(collectionName)
|
||||||
err := globalMetaCache.Sync(collectionName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.LogFields(oplog.Error(err))
|
|
||||||
span.Finish()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
autoID := collSchema.AutoID
|
||||||
description, err := globalMetaCache.Get(collectionName)
|
collID, err := globalMetaCache.GetCollectionID(collectionName)
|
||||||
if err != nil || description == nil {
|
if err != nil {
|
||||||
span.LogFields(oplog.Error(err))
|
|
||||||
span.Finish()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
autoID := description.Schema.AutoID
|
it.CollectionID = collID
|
||||||
|
partitionID, err := globalMetaCache.GetPartitionID(collectionName, it.PartitionName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
it.PartitionID = partitionID
|
||||||
span.LogFields(oplog.Bool("auto_id", autoID))
|
span.LogFields(oplog.Bool("auto_id", autoID))
|
||||||
var rowIDBegin UniqueID
|
var rowIDBegin UniqueID
|
||||||
var rowIDEnd UniqueID
|
var rowIDEnd UniqueID
|
||||||
@ -174,7 +174,7 @@ func (it *InsertTask) Execute() error {
|
|||||||
|
|
||||||
msgPack.Msgs[0] = tsMsg
|
msgPack.Msgs[0] = tsMsg
|
||||||
|
|
||||||
stream, err := globalInsertChannelsMap.getInsertMsgStream(description.CollectionID)
|
stream, err := globalInsertChannelsMap.getInsertMsgStream(collID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
collectionInsertChannels, err := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
|
collectionInsertChannels, err := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
@ -184,17 +184,17 @@ func (it *InsertTask) Execute() error {
|
|||||||
SourceID: Params.ProxyID,
|
SourceID: Params.ProxyID,
|
||||||
},
|
},
|
||||||
DbID: 0, // todo
|
DbID: 0, // todo
|
||||||
CollectionID: description.CollectionID,
|
CollectionID: collID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = globalInsertChannelsMap.createInsertMsgStream(description.CollectionID, collectionInsertChannels)
|
err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stream, err = globalInsertChannelsMap.getInsertMsgStream(description.CollectionID)
|
stream, err = globalInsertChannelsMap.getInsertMsgStream(collID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||||
it.result.Status.Reason = err.Error()
|
it.result.Status.Reason = err.Error()
|
||||||
@ -332,11 +332,7 @@ func (cct *CreateCollectionTask) Execute() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
|
if cct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
|
||||||
err = globalMetaCache.Sync(cct.CollectionName)
|
collID, err := globalMetaCache.GetCollectionID(cct.CollectionName)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
desc, err := globalMetaCache.Get(cct.CollectionName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -348,12 +344,12 @@ func (cct *CreateCollectionTask) Execute() error {
|
|||||||
SourceID: Params.ProxyID,
|
SourceID: Params.ProxyID,
|
||||||
},
|
},
|
||||||
DbID: 0, // todo
|
DbID: 0, // todo
|
||||||
CollectionID: desc.CollectionID,
|
CollectionID: collID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = globalInsertChannelsMap.createInsertMsgStream(desc.CollectionID, collectionInsertChannels)
|
err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -417,17 +413,20 @@ func (dct *DropCollectionTask) Execute() error {
|
|||||||
var err error
|
var err error
|
||||||
dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest)
|
dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest)
|
||||||
if dct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
|
if dct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
|
||||||
_ = globalMetaCache.Sync(dct.CollectionName)
|
collID, err := globalMetaCache.GetCollectionID(dct.CollectionName)
|
||||||
desc, _ := globalMetaCache.Get(dct.CollectionName)
|
if err != nil {
|
||||||
_ = globalInsertChannelsMap.closeInsertMsgStream(desc.CollectionID)
|
return err
|
||||||
|
}
|
||||||
|
err = globalInsertChannelsMap.closeInsertMsgStream(collID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dct *DropCollectionTask) PostExecute() error {
|
func (dct *DropCollectionTask) PostExecute() error {
|
||||||
if globalMetaCache.Hit(dct.CollectionName) {
|
globalMetaCache.RemoveCollection(dct.CollectionName)
|
||||||
return globalMetaCache.Remove(dct.CollectionName)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -480,15 +479,7 @@ func (st *SearchTask) PreExecute() error {
|
|||||||
span.SetTag("start time", st.BeginTs())
|
span.SetTag("start time", st.BeginTs())
|
||||||
|
|
||||||
collectionName := st.query.CollectionName
|
collectionName := st.query.CollectionName
|
||||||
if !globalMetaCache.Hit(collectionName) {
|
_, err := globalMetaCache.GetCollectionID(collectionName)
|
||||||
err := globalMetaCache.Sync(collectionName)
|
|
||||||
if err != nil {
|
|
||||||
span.LogFields(oplog.Error(err))
|
|
||||||
span.Finish()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := globalMetaCache.Get(collectionName)
|
|
||||||
if err != nil { // err is not nil if collection not exists
|
if err != nil { // err is not nil if collection not exists
|
||||||
span.LogFields(oplog.Error(err))
|
span.LogFields(oplog.Error(err))
|
||||||
span.Finish()
|
span.Finish()
|
||||||
@ -823,8 +814,7 @@ func (dct *DescribeCollectionTask) Execute() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = globalMetaCache.Update(dct.CollectionName, dct.result)
|
return nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dct *DescribeCollectionTask) PostExecute() error {
|
func (dct *DescribeCollectionTask) PostExecute() error {
|
||||||
@ -1177,8 +1167,11 @@ func (spt *ShowPartitionsTask) PreExecute() error {
|
|||||||
func (spt *ShowPartitionsTask) Execute() error {
|
func (spt *ShowPartitionsTask) Execute() error {
|
||||||
var err error
|
var err error
|
||||||
spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest)
|
spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (spt *ShowPartitionsTask) PostExecute() error {
|
func (spt *ShowPartitionsTask) PostExecute() error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -266,6 +266,7 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
|
|||||||
t := &InvalidateCollectionMetaCacheTask{
|
t := &InvalidateCollectionMetaCacheTask{
|
||||||
request: request,
|
request: request,
|
||||||
Condition: NewTaskCondition(ctx),
|
Condition: NewTaskCondition(ctx),
|
||||||
|
nodeInfos: s.nodeInfos,
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
Loading…
Reference in New Issue
Block a user