2021-12-16 10:07:10 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 11:12:56 +08:00
// with the License. You may obtain a copy of the License at
//
2021-12-16 10:07:10 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 11:12:56 +08:00
//
2021-12-16 10:07:10 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 11:12:56 +08:00
2021-06-18 21:30:08 +08:00
package rootcoord
2021-01-19 14:44:03 +08:00
import (
2022-07-22 10:20:29 +08:00
"context"
2022-08-24 10:02:52 +08:00
"errors"
2021-03-05 10:15:27 +08:00
"fmt"
2021-01-19 14:44:03 +08:00
"sync"
2022-08-25 15:48:54 +08:00
"go.uber.org/zap"
2022-10-16 20:49:27 +08:00
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
2023-01-19 14:13:43 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
2022-07-22 10:20:29 +08:00
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
2023-01-19 14:13:43 +08:00
"github.com/milvus-io/milvus/internal/metrics"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
2022-04-11 19:49:34 +08:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2022-08-11 12:12:38 +08:00
"github.com/milvus-io/milvus/internal/util/contextutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
2023-01-19 14:13:43 +08:00
"github.com/milvus-io/milvus/internal/util/timerecord"
2021-05-14 21:26:06 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-01-19 14:44:03 +08:00
)
2023-01-19 14:13:43 +08:00
//go:generate mockery --name=IMetaTable --outpkg=mockrootcoord --filename=meta_table.go --with-expecter
2022-09-05 13:29:11 +08:00
type IMetaTable interface {
AddCollection ( ctx context . Context , coll * model . Collection ) error
ChangeCollectionState ( ctx context . Context , collectionID UniqueID , state pb . CollectionState , ts Timestamp ) error
RemoveCollection ( ctx context . Context , collectionID UniqueID , ts Timestamp ) error
GetCollectionByName ( ctx context . Context , collectionName string , ts Timestamp ) ( * model . Collection , error )
2023-01-04 16:37:35 +08:00
GetCollectionByID ( ctx context . Context , collectionID UniqueID , ts Timestamp , allowUnavailable bool ) ( * model . Collection , error )
2022-09-05 13:29:11 +08:00
ListCollections ( ctx context . Context , ts Timestamp ) ( [ ] * model . Collection , error )
ListAbnormalCollections ( ctx context . Context , ts Timestamp ) ( [ ] * model . Collection , error )
ListCollectionPhysicalChannels ( ) map [ typeutil . UniqueID ] [ ] string
2022-09-26 18:06:54 +08:00
GetCollectionVirtualChannels ( colID int64 ) [ ] string
2022-09-05 13:29:11 +08:00
AddPartition ( ctx context . Context , partition * model . Partition ) error
ChangePartitionState ( ctx context . Context , collectionID UniqueID , partitionID UniqueID , state pb . PartitionState , ts Timestamp ) error
RemovePartition ( ctx context . Context , collectionID UniqueID , partitionID UniqueID , ts Timestamp ) error
CreateAlias ( ctx context . Context , alias string , collectionName string , ts Timestamp ) error
DropAlias ( ctx context . Context , alias string , ts Timestamp ) error
AlterAlias ( ctx context . Context , alias string , collectionName string , ts Timestamp ) error
2022-10-10 20:31:22 +08:00
AlterCollection ( ctx context . Context , oldColl * model . Collection , newColl * model . Collection , ts Timestamp ) error
2023-01-19 14:13:43 +08:00
RenameCollection ( ctx context . Context , oldName string , newName string , ts Timestamp ) error
2022-09-05 13:29:11 +08:00
// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
IsAlias ( name string ) bool
ListAliasesByID ( collID UniqueID ) [ ] string
// TODO: better to accept ctx.
2022-10-27 16:21:34 +08:00
GetPartitionNameByID ( collID UniqueID , partitionID UniqueID , ts Timestamp ) ( string , error ) // serve for bulk insert.
GetPartitionByName ( collID UniqueID , partitionName string , ts Timestamp ) ( UniqueID , error ) // serve for bulk insert.
2022-09-05 13:29:11 +08:00
// TODO: better to accept ctx.
AddCredential ( credInfo * internalpb . CredentialInfo ) error
GetCredential ( username string ) ( * internalpb . CredentialInfo , error )
DeleteCredential ( username string ) error
AlterCredential ( credInfo * internalpb . CredentialInfo ) error
ListCredentialUsernames ( ) ( * milvuspb . ListCredUsersResponse , error )
// TODO: better to accept ctx.
CreateRole ( tenant string , entity * milvuspb . RoleEntity ) error
DropRole ( tenant string , roleName string ) error
OperateUserRole ( tenant string , userEntity * milvuspb . UserEntity , roleEntity * milvuspb . RoleEntity , operateType milvuspb . OperateUserRoleType ) error
SelectRole ( tenant string , entity * milvuspb . RoleEntity , includeUserInfo bool ) ( [ ] * milvuspb . RoleResult , error )
SelectUser ( tenant string , entity * milvuspb . UserEntity , includeRoleInfo bool ) ( [ ] * milvuspb . UserResult , error )
OperatePrivilege ( tenant string , entity * milvuspb . GrantEntity , operateType milvuspb . OperatePrivilegeType ) error
SelectGrant ( tenant string , entity * milvuspb . GrantEntity ) ( [ ] * milvuspb . GrantEntity , error )
DropGrant ( tenant string , role * milvuspb . RoleEntity ) error
ListPolicy ( tenant string ) ( [ ] string , error )
ListUserRole ( tenant string ) ( [ ] string , error )
}
2021-09-23 15:10:00 +08:00
type MetaTable struct {
2022-08-11 12:12:38 +08:00
ctx context . Context
2022-08-20 10:24:51 +08:00
catalog metastore . RootCoordCatalog
2022-07-22 10:20:29 +08:00
2022-09-05 13:29:11 +08:00
collID2Meta map [ typeutil . UniqueID ] * model . Collection // collection id -> collection meta
collName2ID map [ string ] typeutil . UniqueID // collection name to collection id
collAlias2ID map [ string ] typeutil . UniqueID // collection alias to collection id
2021-01-19 14:44:03 +08:00
2022-08-24 10:02:52 +08:00
ddLock sync . RWMutex
permissionLock sync . RWMutex
2021-01-19 14:44:03 +08:00
}
2022-08-20 10:24:51 +08:00
func NewMetaTable ( ctx context . Context , catalog metastore . RootCoordCatalog ) ( * MetaTable , error ) {
2021-09-23 15:10:00 +08:00
mt := & MetaTable {
2022-12-07 18:01:19 +08:00
ctx : contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) ) ,
2022-08-11 12:12:38 +08:00
catalog : catalog ,
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
if err := mt . reload ( ) ; err != nil {
2021-01-19 14:44:03 +08:00
return nil , err
}
return mt , nil
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) reload ( ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-11-08 20:13:03 +08:00
record := timerecord . NewTimeRecorder ( "rootcoord" )
2022-09-05 13:29:11 +08:00
mt . collID2Meta = make ( map [ UniqueID ] * model . Collection )
mt . collName2ID = make ( map [ string ] UniqueID )
mt . collAlias2ID = make ( map [ string ] UniqueID )
2021-01-19 14:44:03 +08:00
2022-11-09 17:49:04 +08:00
collectionNum := int64 ( 0 )
partitionNum := int64 ( 0 )
2022-09-05 13:29:11 +08:00
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
collections , err := mt . catalog . ListCollections ( mt . ctx , typeutil . MaxTimestamp )
2022-06-15 19:14:10 +08:00
if err != nil {
return err
}
2022-09-05 13:29:11 +08:00
for name , collection := range collections {
mt . collID2Meta [ collection . CollectionID ] = collection
mt . collName2ID [ name ] = collection . CollectionID
2022-11-09 17:49:04 +08:00
if collection . Available ( ) {
collectionNum ++
partitionNum += int64 ( collection . GetPartitionNum ( true ) )
}
2022-06-15 19:14:10 +08:00
}
2022-09-05 13:29:11 +08:00
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
aliases , err := mt . catalog . ListAliases ( mt . ctx , typeutil . MaxTimestamp )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2022-09-05 13:29:11 +08:00
for _ , alias := range aliases {
mt . collAlias2ID [ alias . Name ] = alias . CollectionID
2022-06-10 13:10:08 +08:00
}
2022-11-09 17:49:04 +08:00
metrics . RootCoordNumOfCollections . Set ( float64 ( collectionNum ) )
metrics . RootCoordNumOfPartitions . WithLabelValues ( ) . Set ( float64 ( partitionNum ) )
2022-11-08 20:13:03 +08:00
record . Record ( "MetaTable reload" )
2021-01-20 09:36:50 +08:00
return nil
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) AddCollection ( ctx context . Context , coll * model . Collection ) error {
2021-01-19 14:44:03 +08:00
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2021-01-20 09:36:50 +08:00
2022-09-05 13:29:11 +08:00
// Note:
// 1, idempotency check was already done outside;
// 2, no need to check time travel logic, since ts should always be the latest;
2021-01-20 09:36:50 +08:00
2022-09-05 13:29:11 +08:00
if coll . State != pb . CollectionState_CollectionCreating {
return fmt . Errorf ( "collection state should be creating, collection name: %s, collection id: %d, state: %s" , coll . Name , coll . CollectionID , coll . State )
2022-06-10 13:10:08 +08:00
}
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . CreateCollection ( ctx1 , coll , coll . CreateTime ) ; err != nil {
2022-07-22 10:20:29 +08:00
return err
2022-06-10 13:10:08 +08:00
}
2022-07-22 10:20:29 +08:00
mt . collName2ID [ coll . Name ] = coll . CollectionID
2022-09-05 13:29:11 +08:00
mt . collID2Meta [ coll . CollectionID ] = coll . Clone ( )
log . Info ( "add collection to meta table" , zap . String ( "collection" , coll . Name ) ,
zap . Int64 ( "id" , coll . CollectionID ) , zap . Uint64 ( "ts" , coll . CreateTime ) )
2022-06-10 13:10:08 +08:00
return nil
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) ChangeCollectionState ( ctx context . Context , collectionID UniqueID , state pb . CollectionState , ts Timestamp ) error {
2021-01-19 14:44:03 +08:00
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collectionID ]
2021-01-19 14:44:03 +08:00
if ! ok {
2022-09-05 13:29:11 +08:00
return nil
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
clone := coll . Clone ( )
clone . State = state
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . AlterCollection ( ctx1 , coll , clone , metastore . MODIFY , ts ) ; err != nil {
return err
2021-09-18 11:13:51 +08:00
}
2022-09-05 13:29:11 +08:00
mt . collID2Meta [ collectionID ] = clone
2022-11-09 17:49:04 +08:00
switch state {
case pb . CollectionState_CollectionCreated :
metrics . RootCoordNumOfCollections . Inc ( )
metrics . RootCoordNumOfPartitions . WithLabelValues ( ) . Add ( float64 ( coll . GetPartitionNum ( true ) ) )
default :
metrics . RootCoordNumOfCollections . Dec ( )
metrics . RootCoordNumOfPartitions . WithLabelValues ( ) . Sub ( float64 ( coll . GetPartitionNum ( true ) ) )
}
2022-09-05 13:29:11 +08:00
log . Info ( "change collection state" , zap . Int64 ( "collection" , collectionID ) ,
zap . String ( "state" , state . String ( ) ) , zap . Uint64 ( "ts" , ts ) )
2021-05-17 19:15:01 +08:00
2022-09-05 13:29:11 +08:00
return nil
}
2022-06-10 13:10:08 +08:00
2022-10-24 15:57:29 +08:00
func ( mt * MetaTable ) removeIfNameMatchedInternal ( collectionID UniqueID , name string ) {
id , ok := mt . collName2ID [ name ]
if ok && id == collectionID {
delete ( mt . collName2ID , name )
}
}
func ( mt * MetaTable ) removeIfAliasMatchedInternal ( collectionID UniqueID , alias string ) {
id , ok := mt . collAlias2ID [ alias ]
if ok && id == collectionID {
delete ( mt . collAlias2ID , alias )
}
}
func ( mt * MetaTable ) removeIfMatchedInternal ( collectionID UniqueID , name string ) {
mt . removeIfNameMatchedInternal ( collectionID , name )
mt . removeIfAliasMatchedInternal ( collectionID , name )
}
func ( mt * MetaTable ) removeAllNamesIfMatchedInternal ( collectionID UniqueID , names [ ] string ) {
for _ , name := range names {
mt . removeIfMatchedInternal ( collectionID , name )
}
}
func ( mt * MetaTable ) removeCollectionByIDInternal ( collectionID UniqueID ) {
delete ( mt . collID2Meta , collectionID )
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) RemoveCollection ( ctx context . Context , collectionID UniqueID , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
// Note: we cannot handle case that dropping collection with `ts1` but a collection exists in catalog with newer ts
// which is bigger than `ts1`. So we assume that ts should always be the latest.
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
aliases := mt . listAliasesByID ( collectionID )
if err := mt . catalog . DropCollection ( ctx1 , & model . Collection { CollectionID : collectionID , Aliases : aliases } , ts ) ; err != nil {
2022-07-22 10:20:29 +08:00
return err
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
2022-10-24 15:57:29 +08:00
allNames := common . CloneStringList ( aliases )
2022-09-05 13:29:11 +08:00
var name string
coll , ok := mt . collID2Meta [ collectionID ]
if ok && coll != nil {
name = coll . Name
2022-10-24 15:57:29 +08:00
allNames = append ( allNames , name )
2022-09-05 13:29:11 +08:00
}
2022-06-10 13:10:08 +08:00
2022-10-24 15:57:29 +08:00
// We cannot delete the name directly, since newly collection with same name may be created.
mt . removeAllNamesIfMatchedInternal ( collectionID , allNames )
mt . removeCollectionByIDInternal ( collectionID )
2022-09-22 17:36:52 +08:00
2022-09-05 13:29:11 +08:00
log . Info ( "remove collection" , zap . String ( "name" , name ) , zap . Int64 ( "id" , collectionID ) , zap . Strings ( "aliases" , aliases ) )
2022-06-10 13:10:08 +08:00
return nil
2021-01-19 14:44:03 +08:00
}
2022-10-21 16:37:29 +08:00
func filterUnavailable ( coll * model . Collection ) * model . Collection {
clone := coll . Clone ( )
// pick available partitions.
clone . Partitions = nil
for _ , partition := range coll . Partitions {
if partition . Available ( ) {
clone . Partitions = append ( clone . Partitions , partition . Clone ( ) )
}
}
return clone
}
// getLatestCollectionByIDInternal should be called with ts = typeutil.MaxTimestamp
2023-01-04 16:37:35 +08:00
func ( mt * MetaTable ) getLatestCollectionByIDInternal ( ctx context . Context , collectionID UniqueID , allowAvailable bool ) ( * model . Collection , error ) {
2022-10-21 16:37:29 +08:00
coll , ok := mt . collID2Meta [ collectionID ]
2023-01-04 16:37:35 +08:00
if ! ok || coll == nil {
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %d" , collectionID ) )
}
if allowAvailable {
return coll . Clone ( ) , nil
}
if ! coll . Available ( ) {
2022-10-21 16:37:29 +08:00
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %d" , collectionID ) )
}
return filterUnavailable ( coll ) , nil
}
2022-09-05 13:29:11 +08:00
// getCollectionByIDInternal get collection by collection id without lock.
2023-01-04 16:37:35 +08:00
func ( mt * MetaTable ) getCollectionByIDInternal ( ctx context . Context , collectionID UniqueID , ts Timestamp , allowUnavailable bool ) ( * model . Collection , error ) {
2022-10-21 16:37:29 +08:00
if isMaxTs ( ts ) {
2023-01-04 16:37:35 +08:00
return mt . getLatestCollectionByIDInternal ( ctx , collectionID , allowUnavailable )
2022-10-21 16:37:29 +08:00
}
2022-09-05 13:29:11 +08:00
var coll * model . Collection
var err error
coll , ok := mt . collID2Meta [ collectionID ]
2022-09-27 19:18:54 +08:00
if ! ok || coll == nil || ! coll . Available ( ) || coll . CreateTime > ts {
2022-09-05 13:29:11 +08:00
// travel meta information from catalog.
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
coll , err = mt . catalog . GetCollectionByID ( ctx1 , collectionID , ts )
if err != nil {
return nil , err
}
2021-05-18 14:18:02 +08:00
}
2022-07-22 10:20:29 +08:00
2023-01-04 16:37:35 +08:00
if coll == nil {
// use coll.Name to match error message of regression. TODO: remove this after error code is ready.
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %s" , coll . Name ) )
}
if allowUnavailable {
return coll . Clone ( ) , nil
}
if ! coll . Available ( ) {
2022-09-05 13:29:11 +08:00
// use coll.Name to match error message of regression. TODO: remove this after error code is ready.
2022-09-27 19:18:54 +08:00
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %s" , coll . Name ) )
2022-05-09 20:47:52 +08:00
}
2022-10-21 16:37:29 +08:00
return filterUnavailable ( coll ) , nil
2021-01-21 10:01:29 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) GetCollectionByName ( ctx context . Context , collectionName string , ts Timestamp ) ( * model . Collection , error ) {
2021-01-19 14:44:03 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
var collectionID UniqueID
collectionID , ok := mt . collAlias2ID [ collectionName ]
if ok {
2023-01-04 16:37:35 +08:00
return mt . getCollectionByIDInternal ( ctx , collectionID , ts , false )
2022-09-05 13:29:11 +08:00
}
2022-07-22 10:20:29 +08:00
2022-09-05 13:29:11 +08:00
collectionID , ok = mt . collName2ID [ collectionName ]
if ok {
2023-01-04 16:37:35 +08:00
return mt . getCollectionByIDInternal ( ctx , collectionID , ts , false )
2022-06-10 13:10:08 +08:00
}
2022-07-22 10:20:29 +08:00
2022-10-21 16:37:29 +08:00
if isMaxTs ( ts ) {
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %s" , collectionName ) )
}
2022-09-05 13:29:11 +08:00
// travel meta information from catalog. No need to check time travel logic again, since catalog already did.
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
coll , err := mt . catalog . GetCollectionByName ( ctx1 , collectionName , ts )
if err != nil {
return nil , err
}
2023-01-19 14:13:43 +08:00
2022-10-21 16:37:29 +08:00
if coll == nil || ! coll . Available ( ) {
2022-09-27 19:18:54 +08:00
return nil , common . NewCollectionNotExistError ( fmt . Sprintf ( "can't find collection: %s" , collectionName ) )
}
2022-10-21 16:37:29 +08:00
return filterUnavailable ( coll ) , nil
2021-02-02 10:09:10 +08:00
}
2023-01-04 16:37:35 +08:00
func ( mt * MetaTable ) GetCollectionByID ( ctx context . Context , collectionID UniqueID , ts Timestamp , allowUnavailable bool ) ( * model . Collection , error ) {
2021-01-19 14:44:03 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-07-22 10:20:29 +08:00
2023-01-04 16:37:35 +08:00
return mt . getCollectionByIDInternal ( ctx , collectionID , ts , allowUnavailable )
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) ListCollections ( ctx context . Context , ts Timestamp ) ( [ ] * model . Collection , error ) {
2021-09-22 16:20:48 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
2022-10-31 13:25:35 +08:00
if isMaxTs ( ts ) {
return mt . listCollectionFromCache ( )
}
2022-09-05 13:29:11 +08:00
// list collections should always be loaded from catalog.
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
colls , err := mt . catalog . ListCollections ( ctx1 , ts )
if err != nil {
return nil , err
}
onlineCollections := make ( [ ] * model . Collection , 0 , len ( colls ) )
for _ , coll := range colls {
if coll . Available ( ) {
onlineCollections = append ( onlineCollections , coll )
2021-09-22 16:20:48 +08:00
}
}
2022-09-05 13:29:11 +08:00
return onlineCollections , nil
2021-09-22 16:20:48 +08:00
}
2022-10-31 13:25:35 +08:00
func ( mt * MetaTable ) listCollectionFromCache ( ) ( [ ] * model . Collection , error ) {
collectionFromCache := make ( [ ] * model . Collection , 0 )
for _ , meta := range mt . collID2Meta {
if meta . Available ( ) {
collectionFromCache = append ( collectionFromCache , meta )
}
}
return collectionFromCache , nil
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) ListAbnormalCollections ( ctx context . Context , ts Timestamp ) ( [ ] * model . Collection , error ) {
2021-06-04 15:00:34 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
// list collections should always be loaded from catalog.
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
colls , err := mt . catalog . ListCollections ( ctx1 , ts )
if err != nil {
return nil , err
2021-06-04 15:00:34 +08:00
}
2022-09-05 13:29:11 +08:00
abnormalCollections := make ( [ ] * model . Collection , 0 , len ( colls ) )
for _ , coll := range colls {
if ! coll . Available ( ) {
abnormalCollections = append ( abnormalCollections , coll )
}
}
return abnormalCollections , nil
2021-06-04 15:00:34 +08:00
}
2022-09-05 13:29:11 +08:00
// ListCollectionPhysicalChannels list physical channels of all collections.
2021-11-02 15:50:30 +08:00
func ( mt * MetaTable ) ListCollectionPhysicalChannels ( ) map [ typeutil . UniqueID ] [ ] string {
2021-06-04 15:00:34 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
chanMap := make ( map [ UniqueID ] [ ] string )
2021-06-04 15:00:34 +08:00
2021-11-02 15:50:30 +08:00
for id , collInfo := range mt . collID2Meta {
2022-09-05 13:29:11 +08:00
chanMap [ id ] = common . CloneStringList ( collInfo . PhysicalChannelNames )
2021-06-04 15:00:34 +08:00
}
2022-09-05 13:29:11 +08:00
2021-11-02 15:50:30 +08:00
return chanMap
2021-06-04 15:00:34 +08:00
}
2022-10-10 20:31:22 +08:00
func ( mt * MetaTable ) AlterCollection ( ctx context . Context , oldColl * model . Collection , newColl * model . Collection , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-10-10 20:31:22 +08:00
if err := mt . catalog . AlterCollection ( ctx1 , oldColl , newColl , metastore . MODIFY , ts ) ; err != nil {
return err
}
mt . collID2Meta [ oldColl . CollectionID ] = newColl
log . Info ( "alter collection finished" , zap . Int64 ( "collectionID" , oldColl . CollectionID ) , zap . Uint64 ( "ts" , ts ) )
return nil
}
2023-01-19 14:13:43 +08:00
func ( mt * MetaTable ) RenameCollection ( ctx context . Context , oldName string , newName string , ts Timestamp ) error {
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
ctx = contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
log := log . Ctx ( ctx ) . With ( zap . String ( "oldName" , oldName ) , zap . String ( "newName" , newName ) )
//old collection should not be an alias
_ , ok := mt . collAlias2ID [ oldName ]
if ok {
log . Warn ( "unsupported use a alias to rename collection" )
return fmt . Errorf ( "unsupported use an alias to rename collection, alias:%s" , oldName )
}
// check new collection already exists
newColl , err := mt . GetCollectionByName ( ctx , newName , ts )
if newColl != nil {
return fmt . Errorf ( "duplicated new collection name :%s with other collection name or alias" , newName )
}
if err != nil && ! common . IsCollectionNotExistErrorV2 ( err ) {
log . Warn ( "check new collection name fail" )
return err
}
// get old collection meta
oldColl , err := mt . GetCollectionByName ( ctx , oldName , ts )
if err != nil {
return err
}
newColl = oldColl . Clone ( )
newColl . Name = newName
if err := mt . catalog . AlterCollection ( ctx , oldColl , newColl , metastore . MODIFY , ts ) ; err != nil {
return err
}
mt . collName2ID [ newName ] = oldColl . CollectionID
delete ( mt . collName2ID , oldName )
mt . collID2Meta [ oldColl . CollectionID ] = newColl
log . Info ( "rename collection finished" )
return nil
}
2022-09-26 18:06:54 +08:00
// GetCollectionVirtualChannels returns virtual channels of a given collection.
func ( mt * MetaTable ) GetCollectionVirtualChannels ( colID int64 ) [ ] string {
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
for id , collInfo := range mt . collID2Meta {
if id == colID {
return common . CloneStringList ( collInfo . VirtualChannelNames )
}
}
return nil
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) AddPartition ( ctx context . Context , partition * model . Partition ) error {
2021-01-19 14:44:03 +08:00
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2021-07-03 14:36:18 +08:00
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ partition . CollectionID ]
if ! ok || ! coll . Available ( ) {
return fmt . Errorf ( "collection not exists: %d" , partition . CollectionID )
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
if partition . State != pb . PartitionState_PartitionCreated {
return fmt . Errorf ( "partition state is not created, collection: %d, partition: %d, state: %s" , partition . CollectionID , partition . PartitionID , partition . State )
2022-08-10 10:22:38 +08:00
}
2022-09-05 13:29:11 +08:00
if err := mt . catalog . CreatePartition ( ctx , partition , partition . PartitionCreatedTimestamp ) ; err != nil {
2022-07-22 10:20:29 +08:00
return err
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
mt . collID2Meta [ partition . CollectionID ] . Partitions = append ( mt . collID2Meta [ partition . CollectionID ] . Partitions , partition . Clone ( ) )
2022-11-09 17:49:04 +08:00
metrics . RootCoordNumOfPartitions . WithLabelValues ( ) . Inc ( )
2022-09-05 13:29:11 +08:00
log . Info ( "add partition to meta table" ,
zap . Int64 ( "collection" , partition . CollectionID ) , zap . String ( "partition" , partition . PartitionName ) ,
zap . Int64 ( "partitionid" , partition . PartitionID ) , zap . Uint64 ( "ts" , partition . PartitionCreatedTimestamp ) )
2022-11-09 17:49:04 +08:00
2022-06-10 13:10:08 +08:00
return nil
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) ChangePartitionState ( ctx context . Context , collectionID UniqueID , partitionID UniqueID , state pb . PartitionState , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
coll , ok := mt . collID2Meta [ collectionID ]
if ! ok {
return nil
}
for idx , part := range coll . Partitions {
if part . PartitionID == partitionID {
clone := part . Clone ( )
clone . State = state
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . AlterPartition ( ctx1 , part , clone , metastore . MODIFY , ts ) ; err != nil {
return err
2021-05-18 14:18:02 +08:00
}
2022-09-05 13:29:11 +08:00
mt . collID2Meta [ collectionID ] . Partitions [ idx ] = clone
2022-11-09 17:49:04 +08:00
switch state {
case pb . PartitionState_PartitionCreated :
log . Warn ( "[should not happen] change partition to created" ,
zap . String ( "collection" , coll . Name ) , zap . Int64 ( "collection id" , coll . CollectionID ) ,
zap . String ( "partition" , clone . PartitionName ) , zap . Int64 ( "partition id" , clone . PartitionID ) )
default :
metrics . RootCoordNumOfPartitions . WithLabelValues ( ) . Dec ( )
}
2022-09-05 13:29:11 +08:00
log . Info ( "change partition state" , zap . Int64 ( "collection" , collectionID ) ,
zap . Int64 ( "partition" , partitionID ) , zap . String ( "state" , state . String ( ) ) ,
zap . Uint64 ( "ts" , ts ) )
2022-11-09 17:49:04 +08:00
2022-09-05 13:29:11 +08:00
return nil
2021-05-18 14:18:02 +08:00
}
}
2022-09-05 13:29:11 +08:00
return fmt . Errorf ( "partition not exist, collection: %d, partition: %d" , collectionID , partitionID )
}
2022-07-22 10:20:29 +08:00
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) RemovePartition ( ctx context . Context , collectionID UniqueID , partitionID UniqueID , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . DropPartition ( ctx1 , collectionID , partitionID , ts ) ; err != nil {
return err
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collectionID ]
if ! ok {
return nil
}
var loc = - 1
for idx , part := range coll . Partitions {
if part . PartitionID == partitionID {
loc = idx
break
2021-05-18 14:18:02 +08:00
}
2021-07-03 14:36:18 +08:00
}
2022-09-05 13:29:11 +08:00
if loc != - 1 {
coll . Partitions = append ( coll . Partitions [ : loc ] , coll . Partitions [ loc + 1 : ] ... )
}
log . Info ( "remove partition" , zap . Int64 ( "collection" , collectionID ) , zap . Int64 ( "partition" , partitionID ) , zap . Uint64 ( "ts" , ts ) )
return nil
2021-07-03 14:36:18 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) CreateAlias ( ctx context . Context , alias string , collectionName string , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
if _ , ok := mt . collName2ID [ alias ] ; ok {
return fmt . Errorf ( "cannot create alias, collection already exists with same name: %s" , alias )
2021-07-03 14:36:18 +08:00
}
2022-07-22 10:20:29 +08:00
2022-09-05 13:29:11 +08:00
collectionID , ok := mt . collName2ID [ collectionName ]
if ! ok {
// you cannot alias to a non-existent collection.
return fmt . Errorf ( "collection not exists: %s" , collectionName )
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
// check if alias exists.
aliasedCollectionID , ok := mt . collAlias2ID [ alias ]
if ok && aliasedCollectionID == collectionID {
log . Warn ( "add duplicate alias" , zap . String ( "alias" , alias ) , zap . String ( "collection" , collectionName ) , zap . Uint64 ( "ts" , ts ) )
return nil
} else if ok {
// TODO: better to check if aliasedCollectionID exist or is available, though not very possible.
aliasedColl := mt . collID2Meta [ aliasedCollectionID ]
return fmt . Errorf ( "alias exists and already aliased to another collection, alias: %s, collection: %s, other collection: %s" , alias , collectionName , aliasedColl . Name )
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
// alias didn't exist.
2021-05-14 21:26:06 +08:00
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collectionID ]
if ! ok || ! coll . Available ( ) {
// you cannot alias to a non-existent collection.
return fmt . Errorf ( "collection not exists: %s" , collectionName )
}
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . CreateAlias ( ctx1 , & model . Alias {
Name : alias ,
CollectionID : collectionID ,
CreatedTime : ts ,
State : pb . AliasState_AliasCreated ,
} , ts ) ; err != nil {
return err
}
mt . collAlias2ID [ alias ] = collectionID
log . Info ( "create alias" , zap . String ( "alias" , alias ) , zap . String ( "collection" , collectionName ) , zap . Uint64 ( "ts" , ts ) )
return nil
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) DropAlias ( ctx context . Context , alias string , ts Timestamp ) error {
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . DropAlias ( ctx1 , alias , ts ) ; err != nil {
return err
}
delete ( mt . collAlias2ID , alias )
log . Info ( "drop alias" , zap . String ( "alias" , alias ) , zap . Uint64 ( "ts" , ts ) )
return nil
2021-05-14 21:26:06 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) AlterAlias ( ctx context . Context , alias string , collectionName string , ts Timestamp ) error {
2021-01-19 14:44:03 +08:00
mt . ddLock . Lock ( )
defer mt . ddLock . Unlock ( )
2022-09-05 13:29:11 +08:00
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
if _ , ok := mt . collName2ID [ alias ] ; ok {
return fmt . Errorf ( "cannot alter alias, collection already exists with same name: %s" , alias )
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
collectionID , ok := mt . collName2ID [ collectionName ]
2021-01-19 14:44:03 +08:00
if ! ok {
2022-09-05 13:29:11 +08:00
// you cannot alias to a non-existent collection.
return fmt . Errorf ( "collection not exists: %s" , collectionName )
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collectionID ]
if ! ok || ! coll . Available ( ) {
// you cannot alias to a non-existent collection.
return fmt . Errorf ( "collection not exists: %s" , collectionName )
}
2021-01-19 14:44:03 +08:00
2022-09-05 13:29:11 +08:00
// check if alias exists.
_ , ok = mt . collAlias2ID [ alias ]
if ! ok {
//
return fmt . Errorf ( "failed to alter alias, alias does not exist: %s" , alias )
}
2022-07-22 10:20:29 +08:00
2022-12-07 18:01:19 +08:00
ctx1 := contextutil . WithTenantID ( ctx , Params . CommonCfg . ClusterName . GetValue ( ) )
2022-09-05 13:29:11 +08:00
if err := mt . catalog . AlterAlias ( ctx1 , & model . Alias {
Name : alias ,
CollectionID : collectionID ,
CreatedTime : ts ,
State : pb . AliasState_AliasCreated ,
} , ts ) ; err != nil {
return err
2021-01-19 14:44:03 +08:00
}
2022-09-05 13:29:11 +08:00
// alias switch to another collection anyway.
mt . collAlias2ID [ alias ] = collectionID
log . Info ( "alter alias" , zap . String ( "alias" , alias ) , zap . String ( "collection" , collectionName ) , zap . Uint64 ( "ts" , ts ) )
return nil
2022-07-22 10:20:29 +08:00
}
2022-06-10 13:10:08 +08:00
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) IsAlias ( name string ) bool {
2021-01-21 10:01:29 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
_ , ok := mt . collAlias2ID [ name ]
return ok
2021-02-02 10:09:10 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) listAliasesByID ( collID UniqueID ) [ ] string {
ret := make ( [ ] string , 0 )
for alias , id := range mt . collAlias2ID {
if id == collID {
ret = append ( ret , alias )
2021-09-28 21:52:20 +08:00
}
2021-01-21 10:01:29 +08:00
}
2022-09-05 13:29:11 +08:00
return ret
2021-01-21 10:01:29 +08:00
}
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) ListAliasesByID ( collID UniqueID ) [ ] string {
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2021-01-21 10:01:29 +08:00
2022-09-05 13:29:11 +08:00
return mt . listAliasesByID ( collID )
2022-05-10 21:07:53 +08:00
}
2022-10-27 16:21:34 +08:00
// GetCollectionNameByID serve for bulk insert. TODO: why this didn't accept ts?
2022-09-27 10:44:53 +08:00
// [Deprecated]
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) GetCollectionNameByID ( collID UniqueID ) ( string , error ) {
2022-09-26 18:06:54 +08:00
mt . ddLock . RLock ( )
2022-06-17 18:08:12 +08:00
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collID ]
if ! ok || ! coll . Available ( ) {
return "" , fmt . Errorf ( "collection not exist: %d" , collID )
2021-09-18 11:13:51 +08:00
}
2022-09-05 13:29:11 +08:00
return coll . Name , nil
}
2021-09-18 11:13:51 +08:00
2022-10-27 16:21:34 +08:00
// GetPartitionNameByID serve for bulk insert.
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) GetPartitionNameByID ( collID UniqueID , partitionID UniqueID , ts Timestamp ) ( string , error ) {
2022-09-26 18:06:54 +08:00
mt . ddLock . RLock ( )
2022-09-05 13:29:11 +08:00
defer mt . ddLock . RUnlock ( )
2021-09-18 11:13:51 +08:00
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collID ]
if ok && coll . Available ( ) && coll . CreateTime <= ts {
// cache hit.
for _ , partition := range coll . Partitions {
if partition . Available ( ) && partition . PartitionID == partitionID && partition . PartitionCreatedTimestamp <= ts {
// cache hit.
return partition . PartitionName , nil
}
}
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
// cache miss, get from catalog anyway.
coll , err := mt . catalog . GetCollectionByID ( mt . ctx , collID , ts )
if err != nil {
return "" , err
2021-09-18 11:13:51 +08:00
}
2022-09-05 13:29:11 +08:00
if ! coll . Available ( ) {
return "" , fmt . Errorf ( "collection not exist: %d" , collID )
2021-09-18 11:13:51 +08:00
}
2022-09-05 13:29:11 +08:00
for _ , partition := range coll . Partitions {
// no need to check time travel logic again, since catalog already did.
if partition . Available ( ) && partition . PartitionID == partitionID {
return partition . PartitionName , nil
}
2022-06-10 13:10:08 +08:00
}
2022-09-05 13:29:11 +08:00
return "" , fmt . Errorf ( "partition not exist: %d" , partitionID )
2021-09-18 11:13:51 +08:00
}
2022-10-27 16:21:34 +08:00
// GetCollectionIDByName serve for bulk insert. TODO: why this didn't accept ts?
2022-09-27 10:44:53 +08:00
// [Deprecated]
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) GetCollectionIDByName ( name string ) ( UniqueID , error ) {
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2021-09-18 11:13:51 +08:00
2022-09-05 13:29:11 +08:00
id , ok := mt . collName2ID [ name ]
2021-09-18 11:13:51 +08:00
if ! ok {
2022-09-05 13:29:11 +08:00
return InvalidCollectionID , fmt . Errorf ( "collection not exists: %s" , name )
2021-09-18 11:13:51 +08:00
}
2022-09-05 13:29:11 +08:00
return id , nil
2021-09-18 11:13:51 +08:00
}
2021-10-08 17:37:53 +08:00
2022-10-27 16:21:34 +08:00
// GetPartitionByName serve for bulk insert.
2022-09-05 13:29:11 +08:00
func ( mt * MetaTable ) GetPartitionByName ( collID UniqueID , partitionName string , ts Timestamp ) ( UniqueID , error ) {
2021-10-08 17:37:53 +08:00
mt . ddLock . RLock ( )
defer mt . ddLock . RUnlock ( )
2022-09-05 13:29:11 +08:00
coll , ok := mt . collID2Meta [ collID ]
if ok && coll . Available ( ) && coll . CreateTime <= ts {
// cache hit.
for _ , partition := range coll . Partitions {
if partition . Available ( ) && partition . PartitionName == partitionName && partition . PartitionCreatedTimestamp <= ts {
// cache hit.
return partition . PartitionID , nil
}
}
}
// cache miss, get from catalog anyway.
coll , err := mt . catalog . GetCollectionByID ( mt . ctx , collID , ts )
if err != nil {
return common . InvalidPartitionID , err
}
if ! coll . Available ( ) {
return common . InvalidPartitionID , fmt . Errorf ( "collection not exist: %d" , collID )
}
for _ , partition := range coll . Partitions {
// no need to check time travel logic again, since catalog already did.
if partition . Available ( ) && partition . PartitionName == partitionName {
return partition . PartitionID , nil
}
}
2022-11-09 14:49:03 +08:00
log . Error ( "partition ID not found for partition name" , zap . String ( "partitionName" , partitionName ) ,
zap . Int64 ( "collectionID" , collID ) , zap . String ( "collectionName" , coll . Name ) )
return common . InvalidPartitionID , fmt . Errorf ( "partition ID not found for partition name '%s' in collection '%s'" ,
partitionName , coll . Name )
2021-10-08 17:37:53 +08:00
}
2022-04-11 19:49:34 +08:00
// AddCredential add credential
func ( mt * MetaTable ) AddCredential ( credInfo * internalpb . CredentialInfo ) error {
if credInfo . Username == "" {
return fmt . Errorf ( "username is empty" )
}
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
usernames , err := mt . catalog . ListCredentials ( mt . ctx )
if err != nil {
return err
}
2022-12-07 18:01:19 +08:00
if len ( usernames ) >= Params . ProxyCfg . MaxUserNum . GetAsInt ( ) {
2022-08-26 19:22:56 +08:00
errMsg := "unable to add user because the number of users has reached the limit"
2022-12-07 18:01:19 +08:00
log . Error ( errMsg , zap . Int ( "max_user_num" , Params . ProxyCfg . MaxUserNum . GetAsInt ( ) ) )
2022-08-26 19:22:56 +08:00
return errors . New ( errMsg )
2022-08-24 10:02:52 +08:00
}
if origin , _ := mt . catalog . GetCredential ( mt . ctx , credInfo . Username ) ; origin != nil {
return fmt . Errorf ( "user already exists: %s" , credInfo . Username )
}
credential := & model . Credential {
Username : credInfo . Username ,
EncryptedPassword : credInfo . EncryptedPassword ,
}
return mt . catalog . CreateCredential ( mt . ctx , credential )
}
2022-08-24 14:32:56 +08:00
// AlterCredential update credential
func ( mt * MetaTable ) AlterCredential ( credInfo * internalpb . CredentialInfo ) error {
if credInfo . Username == "" {
return fmt . Errorf ( "username is empty" )
}
2022-07-22 10:20:29 +08:00
2022-08-26 19:22:56 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
2022-07-22 10:20:29 +08:00
credential := & model . Credential {
Username : credInfo . Username ,
EncryptedPassword : credInfo . EncryptedPassword ,
2022-06-10 13:10:08 +08:00
}
2022-08-24 14:32:56 +08:00
return mt . catalog . AlterCredential ( mt . ctx , credential )
2022-04-11 19:49:34 +08:00
}
// GetCredential get credential by username
2022-08-24 14:32:56 +08:00
func ( mt * MetaTable ) GetCredential ( username string ) ( * internalpb . CredentialInfo , error ) {
2022-08-26 19:22:56 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-07-22 10:20:29 +08:00
credential , err := mt . catalog . GetCredential ( mt . ctx , username )
return model . MarshalCredentialModel ( credential ) , err
2022-04-11 19:49:34 +08:00
}
// DeleteCredential delete credential
func ( mt * MetaTable ) DeleteCredential ( username string ) error {
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
2022-07-22 10:20:29 +08:00
return mt . catalog . DropCredential ( mt . ctx , username )
}
2022-06-10 13:10:08 +08:00
2022-07-22 10:20:29 +08:00
// ListCredentialUsernames list credential usernames
func ( mt * MetaTable ) ListCredentialUsernames ( ) ( * milvuspb . ListCredUsersResponse , error ) {
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-07-22 10:20:29 +08:00
usernames , err := mt . catalog . ListCredentials ( mt . ctx )
2022-06-10 13:10:08 +08:00
if err != nil {
2022-07-22 10:20:29 +08:00
return nil , fmt . Errorf ( "list credential usernames err:%w" , err )
2022-06-10 13:10:08 +08:00
}
2022-07-22 10:20:29 +08:00
return & milvuspb . ListCredUsersResponse { Usernames : usernames } , nil
2022-04-11 19:49:34 +08:00
}
2022-08-04 11:04:34 +08:00
// CreateRole create role
func ( mt * MetaTable ) CreateRole ( tenant string , entity * milvuspb . RoleEntity ) error {
if funcutil . IsEmptyString ( entity . Name ) {
return fmt . Errorf ( "the role name in the role info is empty" )
}
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
results , err := mt . catalog . ListRole ( mt . ctx , tenant , nil , false )
if err != nil {
logger . Error ( "fail to list roles" , zap . Error ( err ) )
return err
}
2022-12-07 18:01:19 +08:00
if len ( results ) >= Params . ProxyCfg . MaxRoleNum . GetAsInt ( ) {
2023-01-12 13:55:42 +08:00
errMsg := "unable to create role because the number of roles has reached the limit"
2022-12-07 18:01:19 +08:00
log . Error ( errMsg , zap . Int ( "max_role_num" , Params . ProxyCfg . MaxRoleNum . GetAsInt ( ) ) )
2022-08-26 19:22:56 +08:00
return errors . New ( errMsg )
2022-08-24 10:02:52 +08:00
}
2022-08-04 11:04:34 +08:00
return mt . catalog . CreateRole ( mt . ctx , tenant , entity )
}
// DropRole drop role info
func ( mt * MetaTable ) DropRole ( tenant string , roleName string ) error {
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
2022-08-04 11:04:34 +08:00
return mt . catalog . DropRole ( mt . ctx , tenant , roleName )
}
// OperateUserRole operate the relationship between a user and a role, including adding a user to a role and removing a user from a role
func ( mt * MetaTable ) OperateUserRole ( tenant string , userEntity * milvuspb . UserEntity , roleEntity * milvuspb . RoleEntity , operateType milvuspb . OperateUserRoleType ) error {
if funcutil . IsEmptyString ( userEntity . Name ) {
return fmt . Errorf ( "username in the user entity is empty" )
}
if funcutil . IsEmptyString ( roleEntity . Name ) {
return fmt . Errorf ( "role name in the role entity is empty" )
}
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
2022-08-23 10:26:53 +08:00
return mt . catalog . AlterUserRole ( mt . ctx , tenant , userEntity , roleEntity , operateType )
2022-08-04 11:04:34 +08:00
}
// SelectRole select role.
// Enter the role condition by the entity param. And this param is nil, which means selecting all roles.
// Get all users that are added to the role by setting the includeUserInfo param to true.
func ( mt * MetaTable ) SelectRole ( tenant string , entity * milvuspb . RoleEntity , includeUserInfo bool ) ( [ ] * milvuspb . RoleResult , error ) {
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-08-23 10:26:53 +08:00
return mt . catalog . ListRole ( mt . ctx , tenant , entity , includeUserInfo )
2022-08-04 11:04:34 +08:00
}
// SelectUser select user.
// Enter the user condition by the entity param. And this param is nil, which means selecting all users.
// Get all roles that are added the user to by setting the includeRoleInfo param to true.
func ( mt * MetaTable ) SelectUser ( tenant string , entity * milvuspb . UserEntity , includeRoleInfo bool ) ( [ ] * milvuspb . UserResult , error ) {
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-08-23 10:26:53 +08:00
return mt . catalog . ListUser ( mt . ctx , tenant , entity , includeRoleInfo )
2022-08-04 11:04:34 +08:00
}
// OperatePrivilege grant or revoke privilege by setting the operateType param
func ( mt * MetaTable ) OperatePrivilege ( tenant string , entity * milvuspb . GrantEntity , operateType milvuspb . OperatePrivilegeType ) error {
if funcutil . IsEmptyString ( entity . ObjectName ) {
return fmt . Errorf ( "the object name in the grant entity is empty" )
}
if entity . Object == nil || funcutil . IsEmptyString ( entity . Object . Name ) {
return fmt . Errorf ( "the object entity in the grant entity is invalid" )
}
if entity . Role == nil || funcutil . IsEmptyString ( entity . Role . Name ) {
return fmt . Errorf ( "the role entity in the grant entity is invalid" )
}
if entity . Grantor == nil {
return fmt . Errorf ( "the grantor in the grant entity is empty" )
}
if entity . Grantor . Privilege == nil || funcutil . IsEmptyString ( entity . Grantor . Privilege . Name ) {
return fmt . Errorf ( "the privilege name in the grant entity is empty" )
}
if entity . Grantor . User == nil || funcutil . IsEmptyString ( entity . Grantor . User . Name ) {
return fmt . Errorf ( "the grantor name in the grant entity is empty" )
}
if ! funcutil . IsRevoke ( operateType ) && ! funcutil . IsGrant ( operateType ) {
return fmt . Errorf ( "the operate type in the grant entity is invalid" )
}
2022-08-24 10:02:52 +08:00
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
2022-08-23 10:26:53 +08:00
return mt . catalog . AlterGrant ( mt . ctx , tenant , entity , operateType )
2022-08-04 11:04:34 +08:00
}
// SelectGrant select grant
// The principal entity MUST be not empty in the grant entity
// The resource entity and the resource name are optional, and the two params should be not empty together when you select some grants about the resource kind.
func ( mt * MetaTable ) SelectGrant ( tenant string , entity * milvuspb . GrantEntity ) ( [ ] * milvuspb . GrantEntity , error ) {
var entities [ ] * milvuspb . GrantEntity
2022-11-25 11:07:12 +08:00
if entity == nil {
return entities , fmt . Errorf ( "the grant entity is nil" )
}
2022-08-04 11:04:34 +08:00
if entity . Role == nil || funcutil . IsEmptyString ( entity . Role . Name ) {
return entities , fmt . Errorf ( "the role entity in the grant entity is invalid" )
}
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-08-23 10:26:53 +08:00
return mt . catalog . ListGrant ( mt . ctx , tenant , entity )
2022-08-04 11:04:34 +08:00
}
2022-08-26 19:22:56 +08:00
func ( mt * MetaTable ) DropGrant ( tenant string , role * milvuspb . RoleEntity ) error {
if role == nil || funcutil . IsEmptyString ( role . Name ) {
return fmt . Errorf ( "the role entity is invalid when dropping the grant" )
}
mt . permissionLock . Lock ( )
defer mt . permissionLock . Unlock ( )
return mt . catalog . DeleteGrant ( mt . ctx , tenant , role )
}
2022-08-04 11:04:34 +08:00
func ( mt * MetaTable ) ListPolicy ( tenant string ) ( [ ] string , error ) {
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-08-04 11:04:34 +08:00
return mt . catalog . ListPolicy ( mt . ctx , tenant )
}
func ( mt * MetaTable ) ListUserRole ( tenant string ) ( [ ] string , error ) {
2022-08-24 10:02:52 +08:00
mt . permissionLock . RLock ( )
defer mt . permissionLock . RUnlock ( )
2022-08-04 11:04:34 +08:00
return mt . catalog . ListUserRole ( mt . ctx , tenant )
}