mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Move locks back to meta table methods (#16848)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
parent
76bfd41952
commit
5afdf3d49f
@ -408,6 +408,19 @@ func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timesta
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// GetCollectionIDByName returns the collection ID according to its name.
|
||||
// Returns an error if no matching ID is found.
|
||||
func (mt *MetaTable) GetCollectionIDByName(cName string) (typeutil.UniqueID, error) {
|
||||
mt.ddLock.RLock()
|
||||
defer mt.ddLock.RUnlock()
|
||||
var cID UniqueID
|
||||
var ok bool
|
||||
if cID, ok = mt.collName2ID[cName]; !ok {
|
||||
return 0, fmt.Errorf("collection ID not found for collection name %s", cName)
|
||||
}
|
||||
return cID, nil
|
||||
}
|
||||
|
||||
// GetCollectionByID return collection meta by collection id
|
||||
func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
|
||||
mt.ddLock.RLock()
|
||||
|
@ -1312,6 +1312,14 @@ func TestMetaWithTimestamp(t *testing.T) {
|
||||
assert.NotNil(t, err)
|
||||
_, err = mt.GetPartitionByName(2, partName2, tsoStart)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
var cID UniqueID
|
||||
cID, err = mt.GetCollectionIDByName(collName1)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collID1, cID)
|
||||
|
||||
_, err = mt.GetCollectionIDByName("badname")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestFixIssue10540(t *testing.T) {
|
||||
|
@ -2277,18 +2277,19 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
||||
}
|
||||
|
||||
// Get collection/partition ID from collection/partition name.
|
||||
var cID int64
|
||||
var ok bool
|
||||
c.MetaTable.ddLock.RLock()
|
||||
defer c.MetaTable.ddLock.RUnlock()
|
||||
if cID, ok = c.MetaTable.collName2ID[req.GetCollectionName()]; !ok {
|
||||
log.Error("failed to find collection ID for collection name",
|
||||
zap.String("collection name", req.GetCollectionName()))
|
||||
return nil, fmt.Errorf("collection ID not found for collection name %s", req.GetCollectionName())
|
||||
}
|
||||
var pID int64
|
||||
var cID UniqueID
|
||||
var err error
|
||||
if cID, err = c.MetaTable.GetCollectionIDByName(req.GetCollectionName()); err != nil {
|
||||
log.Error("failed to find collection ID from its name",
|
||||
zap.String("collection name", req.GetCollectionName()),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
var pID UniqueID
|
||||
if pID, err = c.MetaTable.getPartitionByName(cID, req.GetPartitionName(), 0); err != nil {
|
||||
log.Error("failed to get partition ID from its name",
|
||||
zap.String("partition name", req.GetPartitionName()),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
log.Info("receive import request",
|
||||
@ -2357,22 +2358,19 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Reverse look up collection name on collection ID.
|
||||
// Look up collection name on collection ID.
|
||||
var colName string
|
||||
c.MetaTable.ddLock.RLock()
|
||||
defer c.MetaTable.ddLock.RUnlock()
|
||||
for k, v := range c.MetaTable.collName2ID {
|
||||
if v == ti.GetCollectionId() {
|
||||
colName = k
|
||||
}
|
||||
}
|
||||
if colName == "" {
|
||||
log.Error("Collection name not found for collection ID", zap.Int64("collection ID", ti.GetCollectionId()))
|
||||
var colMeta *etcdpb.CollectionInfo
|
||||
if colMeta, err = c.MetaTable.GetCollectionByID(ti.GetCollectionId(), 0); err != nil {
|
||||
log.Error("failed to get collection name",
|
||||
zap.Int64("collection ID", ti.GetCollectionId()),
|
||||
zap.Error(err))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNameNotFound,
|
||||
Reason: "Collection name not found for collection ID" + strconv.FormatInt(ti.GetCollectionId(), 10),
|
||||
Reason: "failed to get collection name for collection ID" + strconv.FormatInt(ti.GetCollectionId(), 10),
|
||||
}, nil
|
||||
}
|
||||
colName = colMeta.GetSchema().GetName()
|
||||
|
||||
// When DataNode has done its thing, remove it from the busy node list.
|
||||
func() {
|
||||
@ -2490,10 +2488,10 @@ func (c *Core) postImportPersistLoop(ctx context.Context, taskID int64, colID in
|
||||
c.wg.Add(1)
|
||||
c.checkSegmentLoadedLoop(ctx, taskID, colID, segIDs)
|
||||
// Check if collection has any indexed fields. If so, start a loop to check segments' index states.
|
||||
c.MetaTable.ddLock.RLock()
|
||||
defer c.MetaTable.ddLock.RUnlock()
|
||||
colMeta := c.MetaTable.collID2Meta[colID]
|
||||
if len(colMeta.GetFieldIndexes()) != 0 {
|
||||
if colMeta, err := c.MetaTable.GetCollectionByID(colID, 0); err != nil {
|
||||
log.Error("failed to find meta for collection",
|
||||
zap.Int64("collection ID", colID))
|
||||
} else if len(colMeta.GetFieldIndexes()) != 0 {
|
||||
c.wg.Add(1)
|
||||
c.checkCompleteIndexLoop(ctx, taskID, colID, colName, segIDs)
|
||||
}
|
||||
|
@ -1362,7 +1362,7 @@ func TestRootCoord_Base(t *testing.T) {
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("import w/ collection ID not found", func(t *testing.T) {
|
||||
t.Run("import with collection ID not found", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
req := &milvuspb.ImportRequest{
|
||||
CollectionName: "bad name",
|
||||
|
Loading…
Reference in New Issue
Block a user