milvus/internal/rootcoord/meta_table.go
Jiquan Long 4e521438c3
Optimize read ddl (#19352)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
2022-09-24 12:42:51 +08:00

866 lines
30 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"errors"
"fmt"
"sync"
"github.com/milvus-io/milvus/internal/common"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/contextutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TimestampPrefix prefix for timestamp
TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp"
// DDOperationPrefix prefix for DD operation
DDOperationPrefix = rootcoord.ComponentPrefix + "/dd-operation"
// DDMsgSendPrefix prefix to indicate whether DD msg has been send
DDMsgSendPrefix = rootcoord.ComponentPrefix + "/dd-msg-send"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"
// DropCollectionDDType name of DD type for drop collection
DropCollectionDDType = "DropCollection"
// CreatePartitionDDType name of DD type for create partition
CreatePartitionDDType = "CreatePartition"
// DropPartitionDDType name of DD type for drop partition
DropPartitionDDType = "DropPartition"
// DefaultIndexType name of default index type for scalar field
DefaultIndexType = "STL_SORT"
// DefaultStringIndexType name of default index type for varChar/string field
DefaultStringIndexType = "Trie"
)
//go:generate mockery --name=IMetaTable --outpkg=mockrootcoord
type IMetaTable interface {
AddCollection(ctx context.Context, coll *model.Collection) error
ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error
RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error
GetCollectionByName(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error)
GetCollectionByID(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error)
ListCollections(ctx context.Context, ts Timestamp) ([]*model.Collection, error)
ListAbnormalCollections(ctx context.Context, ts Timestamp) ([]*model.Collection, error)
ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string
AddPartition(ctx context.Context, partition *model.Partition) error
ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error
RemovePartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error
CreateAlias(ctx context.Context, alias string, collectionName string, ts Timestamp) error
DropAlias(ctx context.Context, alias string, ts Timestamp) error
AlterAlias(ctx context.Context, alias string, collectionName string, ts Timestamp) error
// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
IsAlias(name string) bool
ListAliasesByID(collID UniqueID) []string
// TODO: better to accept ctx.
// TODO: should GetCollectionNameByID & GetCollectionIDByName also accept ts?
GetCollectionNameByID(collID UniqueID) (string, error) // serve for bulk load.
GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) // serve for bulk load.
GetCollectionIDByName(name string) (UniqueID, error) // serve for bulk load.
GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) // serve for bulk load.
// TODO: better to accept ctx.
AddCredential(credInfo *internalpb.CredentialInfo) error
GetCredential(username string) (*internalpb.CredentialInfo, error)
DeleteCredential(username string) error
AlterCredential(credInfo *internalpb.CredentialInfo) error
ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error)
// TODO: better to accept ctx.
CreateRole(tenant string, entity *milvuspb.RoleEntity) error
DropRole(tenant string, roleName string) error
OperateUserRole(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error
SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error)
SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error)
OperatePrivilege(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error
SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error)
DropGrant(tenant string, role *milvuspb.RoleEntity) error
ListPolicy(tenant string) ([]string, error)
ListUserRole(tenant string) ([]string, error)
}
type MetaTable struct {
ctx context.Context
catalog metastore.RootCoordCatalog
collID2Meta map[typeutil.UniqueID]*model.Collection // collection id -> collection meta
collName2ID map[string]typeutil.UniqueID // collection name to collection id
collAlias2ID map[string]typeutil.UniqueID // collection alias to collection id
ddLock sync.RWMutex
permissionLock sync.RWMutex
}
func NewMetaTable(ctx context.Context, catalog metastore.RootCoordCatalog) (*MetaTable, error) {
mt := &MetaTable{
ctx: contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName),
catalog: catalog,
}
if err := mt.reload(); err != nil {
return nil, err
}
return mt, nil
}
func (mt *MetaTable) reload() error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
mt.collID2Meta = make(map[UniqueID]*model.Collection)
mt.collName2ID = make(map[string]UniqueID)
mt.collAlias2ID = make(map[string]UniqueID)
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
collections, err := mt.catalog.ListCollections(mt.ctx, typeutil.MaxTimestamp)
if err != nil {
return err
}
for name, collection := range collections {
mt.collID2Meta[collection.CollectionID] = collection
mt.collName2ID[name] = collection.CollectionID
}
// max ts means listing latest resources, meta table should always cache the latest version of catalog.
aliases, err := mt.catalog.ListAliases(mt.ctx, typeutil.MaxTimestamp)
if err != nil {
return err
}
for _, alias := range aliases {
mt.collAlias2ID[alias.Name] = alias.CollectionID
}
return nil
}
func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// Note:
// 1, idempotency check was already done outside;
// 2, no need to check time travel logic, since ts should always be the latest;
if coll.State != pb.CollectionState_CollectionCreating {
return fmt.Errorf("collection state should be creating, collection name: %s, collection id: %d, state: %s", coll.Name, coll.CollectionID, coll.State)
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.CreateCollection(ctx1, coll, coll.CreateTime); err != nil {
return err
}
mt.collName2ID[coll.Name] = coll.CollectionID
mt.collID2Meta[coll.CollectionID] = coll.Clone()
log.Info("add collection to meta table", zap.String("collection", coll.Name),
zap.Int64("id", coll.CollectionID), zap.Uint64("ts", coll.CreateTime))
return nil
}
func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collectionID]
if !ok {
return nil
}
clone := coll.Clone()
clone.State = state
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.AlterCollection(ctx1, coll, clone, metastore.MODIFY, ts); err != nil {
return err
}
mt.collID2Meta[collectionID] = clone
log.Info("change collection state", zap.Int64("collection", collectionID),
zap.String("state", state.String()), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// Note: we cannot handle case that dropping collection with `ts1` but a collection exists in catalog with newer ts
// which is bigger than `ts1`. So we assume that ts should always be the latest.
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
aliases := mt.listAliasesByID(collectionID)
if err := mt.catalog.DropCollection(ctx1, &model.Collection{CollectionID: collectionID, Aliases: aliases}, ts); err != nil {
return err
}
var name string
coll, ok := mt.collID2Meta[collectionID]
if ok && coll != nil {
name = coll.Name
delete(mt.collName2ID, name)
}
for _, alias := range aliases {
delete(mt.collAlias2ID, alias)
}
delete(mt.collID2Meta, collectionID)
log.Info("remove collection", zap.String("name", name), zap.Int64("id", collectionID), zap.Strings("aliases", aliases))
return nil
}
// getCollectionByIDInternal get collection by collection id without lock.
func (mt *MetaTable) getCollectionByIDInternal(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
var coll *model.Collection
var err error
coll, ok := mt.collID2Meta[collectionID]
if !ok || !coll.Available() || coll.CreateTime > ts {
// travel meta information from catalog.
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
coll, err = mt.catalog.GetCollectionByID(ctx1, collectionID, ts)
if err != nil {
return nil, err
}
}
if !coll.Available() {
// use coll.Name to match error message of regression. TODO: remove this after error code is ready.
return nil, fmt.Errorf("can't find collection: %s", coll.Name)
}
clone := coll.Clone()
// pick available partitions.
clone.Partitions = nil
for _, partition := range coll.Partitions {
if partition.Available() {
clone.Partitions = append(clone.Partitions, partition.Clone())
}
}
return clone, nil
}
func (mt *MetaTable) GetCollectionByName(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
var collectionID UniqueID
collectionID, ok := mt.collAlias2ID[collectionName]
if ok {
return mt.getCollectionByIDInternal(ctx, collectionID, ts)
}
collectionID, ok = mt.collName2ID[collectionName]
if ok {
return mt.getCollectionByIDInternal(ctx, collectionID, ts)
}
// travel meta information from catalog. No need to check time travel logic again, since catalog already did.
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
coll, err := mt.catalog.GetCollectionByName(ctx1, collectionName, ts)
if err != nil {
return nil, err
}
if !coll.Available() {
return nil, fmt.Errorf("can't find collection: %s", collectionName)
}
return coll, nil
}
func (mt *MetaTable) GetCollectionByID(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.getCollectionByIDInternal(ctx, collectionID, ts)
}
func (mt *MetaTable) ListCollections(ctx context.Context, ts Timestamp) ([]*model.Collection, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
// list collections should always be loaded from catalog.
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
colls, err := mt.catalog.ListCollections(ctx1, ts)
if err != nil {
return nil, err
}
onlineCollections := make([]*model.Collection, 0, len(colls))
for _, coll := range colls {
if coll.Available() {
onlineCollections = append(onlineCollections, coll)
}
}
return onlineCollections, nil
}
func (mt *MetaTable) ListAbnormalCollections(ctx context.Context, ts Timestamp) ([]*model.Collection, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
// list collections should always be loaded from catalog.
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
colls, err := mt.catalog.ListCollections(ctx1, ts)
if err != nil {
return nil, err
}
abnormalCollections := make([]*model.Collection, 0, len(colls))
for _, coll := range colls {
if !coll.Available() {
abnormalCollections = append(abnormalCollections, coll)
}
}
return abnormalCollections, nil
}
// ListCollectionPhysicalChannels list physical channels of all collections.
func (mt *MetaTable) ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
chanMap := make(map[UniqueID][]string)
for id, collInfo := range mt.collID2Meta {
chanMap[id] = common.CloneStringList(collInfo.PhysicalChannelNames)
}
return chanMap
}
func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partition) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[partition.CollectionID]
if !ok || !coll.Available() {
return fmt.Errorf("collection not exists: %d", partition.CollectionID)
}
if partition.State != pb.PartitionState_PartitionCreated {
return fmt.Errorf("partition state is not created, collection: %d, partition: %d, state: %s", partition.CollectionID, partition.PartitionID, partition.State)
}
if err := mt.catalog.CreatePartition(ctx, partition, partition.PartitionCreatedTimestamp); err != nil {
return err
}
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())
log.Info("add partition to meta table",
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
return nil
}
func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collectionID]
if !ok {
return nil
}
for idx, part := range coll.Partitions {
if part.PartitionID == partitionID {
clone := part.Clone()
clone.State = state
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.AlterPartition(ctx1, part, clone, metastore.MODIFY, ts); err != nil {
return err
}
mt.collID2Meta[collectionID].Partitions[idx] = clone
log.Info("change partition state", zap.Int64("collection", collectionID),
zap.Int64("partition", partitionID), zap.String("state", state.String()),
zap.Uint64("ts", ts))
return nil
}
}
return fmt.Errorf("partition not exist, collection: %d, partition: %d", collectionID, partitionID)
}
func (mt *MetaTable) RemovePartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.DropPartition(ctx1, collectionID, partitionID, ts); err != nil {
return err
}
coll, ok := mt.collID2Meta[collectionID]
if !ok {
return nil
}
var loc = -1
for idx, part := range coll.Partitions {
if part.PartitionID == partitionID {
loc = idx
break
}
}
if loc != -1 {
coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...)
}
log.Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) CreateAlias(ctx context.Context, alias string, collectionName string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
if _, ok := mt.collName2ID[alias]; ok {
return fmt.Errorf("cannot create alias, collection already exists with same name: %s", alias)
}
collectionID, ok := mt.collName2ID[collectionName]
if !ok {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
}
// check if alias exists.
aliasedCollectionID, ok := mt.collAlias2ID[alias]
if ok && aliasedCollectionID == collectionID {
log.Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts))
return nil
} else if ok {
// TODO: better to check if aliasedCollectionID exist or is available, though not very possible.
aliasedColl := mt.collID2Meta[aliasedCollectionID]
return fmt.Errorf("alias exists and already aliased to another collection, alias: %s, collection: %s, other collection: %s", alias, collectionName, aliasedColl.Name)
}
// alias didn't exist.
coll, ok := mt.collID2Meta[collectionID]
if !ok || !coll.Available() {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.CreateAlias(ctx1, &model.Alias{
Name: alias,
CollectionID: collectionID,
CreatedTime: ts,
State: pb.AliasState_AliasCreated,
}, ts); err != nil {
return err
}
mt.collAlias2ID[alias] = collectionID
log.Info("create alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) DropAlias(ctx context.Context, alias string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.DropAlias(ctx1, alias, ts); err != nil {
return err
}
delete(mt.collAlias2ID, alias)
log.Info("drop alias", zap.String("alias", alias), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) AlterAlias(ctx context.Context, alias string, collectionName string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
if _, ok := mt.collName2ID[alias]; ok {
return fmt.Errorf("cannot alter alias, collection already exists with same name: %s", alias)
}
collectionID, ok := mt.collName2ID[collectionName]
if !ok {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
}
coll, ok := mt.collID2Meta[collectionID]
if !ok || !coll.Available() {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
}
// check if alias exists.
_, ok = mt.collAlias2ID[alias]
if !ok {
//
return fmt.Errorf("failed to alter alias, alias does not exist: %s", alias)
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
if err := mt.catalog.AlterAlias(ctx1, &model.Alias{
Name: alias,
CollectionID: collectionID,
CreatedTime: ts,
State: pb.AliasState_AliasCreated,
}, ts); err != nil {
return err
}
// alias switch to another collection anyway.
mt.collAlias2ID[alias] = collectionID
log.Info("alter alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) IsAlias(name string) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, ok := mt.collAlias2ID[name]
return ok
}
func (mt *MetaTable) listAliasesByID(collID UniqueID) []string {
ret := make([]string, 0)
for alias, id := range mt.collAlias2ID {
if id == collID {
ret = append(ret, alias)
}
}
return ret
}
func (mt *MetaTable) ListAliasesByID(collID UniqueID) []string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.listAliasesByID(collID)
}
// GetCollectionNameByID serve for bulk load. TODO: why this didn't accept ts?
func (mt *MetaTable) GetCollectionNameByID(collID UniqueID) (string, error) {
mt.ddLock.RUnlock()
defer mt.ddLock.RUnlock()
coll, ok := mt.collID2Meta[collID]
if !ok || !coll.Available() {
return "", fmt.Errorf("collection not exist: %d", collID)
}
return coll.Name, nil
}
// GetPartitionNameByID serve for bulk load.
func (mt *MetaTable) GetPartitionNameByID(collID UniqueID, partitionID UniqueID, ts Timestamp) (string, error) {
mt.ddLock.RUnlock()
defer mt.ddLock.RUnlock()
coll, ok := mt.collID2Meta[collID]
if ok && coll.Available() && coll.CreateTime <= ts {
// cache hit.
for _, partition := range coll.Partitions {
if partition.Available() && partition.PartitionID == partitionID && partition.PartitionCreatedTimestamp <= ts {
// cache hit.
return partition.PartitionName, nil
}
}
}
// cache miss, get from catalog anyway.
coll, err := mt.catalog.GetCollectionByID(mt.ctx, collID, ts)
if err != nil {
return "", err
}
if !coll.Available() {
return "", fmt.Errorf("collection not exist: %d", collID)
}
for _, partition := range coll.Partitions {
// no need to check time travel logic again, since catalog already did.
if partition.Available() && partition.PartitionID == partitionID {
return partition.PartitionName, nil
}
}
return "", fmt.Errorf("partition not exist: %d", partitionID)
}
// GetCollectionIDByName serve for bulk load. TODO: why this didn't accept ts?
func (mt *MetaTable) GetCollectionIDByName(name string) (UniqueID, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
id, ok := mt.collName2ID[name]
if !ok {
return InvalidCollectionID, fmt.Errorf("collection not exists: %s", name)
}
return id, nil
}
// GetPartitionByName serve for bulk load.
func (mt *MetaTable) GetPartitionByName(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
coll, ok := mt.collID2Meta[collID]
if ok && coll.Available() && coll.CreateTime <= ts {
// cache hit.
for _, partition := range coll.Partitions {
if partition.Available() && partition.PartitionName == partitionName && partition.PartitionCreatedTimestamp <= ts {
// cache hit.
return partition.PartitionID, nil
}
}
}
// cache miss, get from catalog anyway.
coll, err := mt.catalog.GetCollectionByID(mt.ctx, collID, ts)
if err != nil {
return common.InvalidPartitionID, err
}
if !coll.Available() {
return common.InvalidPartitionID, fmt.Errorf("collection not exist: %d", collID)
}
for _, partition := range coll.Partitions {
// no need to check time travel logic again, since catalog already did.
if partition.Available() && partition.PartitionName == partitionName {
return partition.PartitionID, nil
}
}
return common.InvalidPartitionID, fmt.Errorf("partition ")
}
// AddCredential add credential
func (mt *MetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
if credInfo.Username == "" {
return fmt.Errorf("username is empty")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
usernames, err := mt.catalog.ListCredentials(mt.ctx)
if err != nil {
return err
}
if len(usernames) >= Params.ProxyCfg.MaxUserNum {
errMsg := "unable to add user because the number of users has reached the limit"
log.Error(errMsg, zap.Int("max_user_num", Params.ProxyCfg.MaxUserNum))
return errors.New(errMsg)
}
if origin, _ := mt.catalog.GetCredential(mt.ctx, credInfo.Username); origin != nil {
return fmt.Errorf("user already exists: %s", credInfo.Username)
}
credential := &model.Credential{
Username: credInfo.Username,
EncryptedPassword: credInfo.EncryptedPassword,
}
return mt.catalog.CreateCredential(mt.ctx, credential)
}
// AlterCredential update credential
func (mt *MetaTable) AlterCredential(credInfo *internalpb.CredentialInfo) error {
if credInfo.Username == "" {
return fmt.Errorf("username is empty")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
credential := &model.Credential{
Username: credInfo.Username,
EncryptedPassword: credInfo.EncryptedPassword,
}
return mt.catalog.AlterCredential(mt.ctx, credential)
}
// GetCredential get credential by username
func (mt *MetaTable) GetCredential(username string) (*internalpb.CredentialInfo, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
credential, err := mt.catalog.GetCredential(mt.ctx, username)
return model.MarshalCredentialModel(credential), err
}
// DeleteCredential delete credential
func (mt *MetaTable) DeleteCredential(username string) error {
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
return mt.catalog.DropCredential(mt.ctx, username)
}
// ListCredentialUsernames list credential usernames
func (mt *MetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
usernames, err := mt.catalog.ListCredentials(mt.ctx)
if err != nil {
return nil, fmt.Errorf("list credential usernames err:%w", err)
}
return &milvuspb.ListCredUsersResponse{Usernames: usernames}, nil
}
// CreateRole create role
func (mt *MetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) error {
if funcutil.IsEmptyString(entity.Name) {
return fmt.Errorf("the role name in the role info is empty")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
results, err := mt.catalog.ListRole(mt.ctx, tenant, nil, false)
if err != nil {
logger.Error("fail to list roles", zap.Error(err))
return err
}
if len(results) >= Params.ProxyCfg.MaxRoleNum {
errMsg := "unable to add role because the number of roles has reached the limit"
log.Error(errMsg, zap.Int("max_role_num", Params.ProxyCfg.MaxRoleNum))
return errors.New(errMsg)
}
return mt.catalog.CreateRole(mt.ctx, tenant, entity)
}
// DropRole drop role info
func (mt *MetaTable) DropRole(tenant string, roleName string) error {
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
return mt.catalog.DropRole(mt.ctx, tenant, roleName)
}
// OperateUserRole operate the relationship between a user and a role, including adding a user to a role and removing a user from a role
func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error {
if funcutil.IsEmptyString(userEntity.Name) {
return fmt.Errorf("username in the user entity is empty")
}
if funcutil.IsEmptyString(roleEntity.Name) {
return fmt.Errorf("role name in the role entity is empty")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
return mt.catalog.AlterUserRole(mt.ctx, tenant, userEntity, roleEntity, operateType)
}
// SelectRole select role.
// Enter the role condition by the entity param. And this param is nil, which means selecting all roles.
// Get all users that are added to the role by setting the includeUserInfo param to true.
func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
return mt.catalog.ListRole(mt.ctx, tenant, entity, includeUserInfo)
}
// SelectUser select user.
// Enter the user condition by the entity param. And this param is nil, which means selecting all users.
// Get all roles that are added the user to by setting the includeRoleInfo param to true.
func (mt *MetaTable) SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
return mt.catalog.ListUser(mt.ctx, tenant, entity, includeRoleInfo)
}
// OperatePrivilege grant or revoke privilege by setting the operateType param
func (mt *MetaTable) OperatePrivilege(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
if funcutil.IsEmptyString(entity.ObjectName) {
return fmt.Errorf("the object name in the grant entity is empty")
}
if entity.Object == nil || funcutil.IsEmptyString(entity.Object.Name) {
return fmt.Errorf("the object entity in the grant entity is invalid")
}
if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) {
return fmt.Errorf("the role entity in the grant entity is invalid")
}
if entity.Grantor == nil {
return fmt.Errorf("the grantor in the grant entity is empty")
}
if entity.Grantor.Privilege == nil || funcutil.IsEmptyString(entity.Grantor.Privilege.Name) {
return fmt.Errorf("the privilege name in the grant entity is empty")
}
if entity.Grantor.User == nil || funcutil.IsEmptyString(entity.Grantor.User.Name) {
return fmt.Errorf("the grantor name in the grant entity is empty")
}
if !funcutil.IsRevoke(operateType) && !funcutil.IsGrant(operateType) {
return fmt.Errorf("the operate type in the grant entity is invalid")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
return mt.catalog.AlterGrant(mt.ctx, tenant, entity, operateType)
}
// SelectGrant select grant
// The principal entity MUST be not empty in the grant entity
// The resource entity and the resource name are optional, and the two params should be not empty together when you select some grants about the resource kind.
func (mt *MetaTable) SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error) {
var entities []*milvuspb.GrantEntity
if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) {
return entities, fmt.Errorf("the role entity in the grant entity is invalid")
}
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
return mt.catalog.ListGrant(mt.ctx, tenant, entity)
}
func (mt *MetaTable) DropGrant(tenant string, role *milvuspb.RoleEntity) error {
if role == nil || funcutil.IsEmptyString(role.Name) {
return fmt.Errorf("the role entity is invalid when dropping the grant")
}
mt.permissionLock.Lock()
defer mt.permissionLock.Unlock()
return mt.catalog.DeleteGrant(mt.ctx, tenant, role)
}
func (mt *MetaTable) ListPolicy(tenant string) ([]string, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
return mt.catalog.ListPolicy(mt.ctx, tenant)
}
func (mt *MetaTable) ListUserRole(tenant string) ([]string, error) {
mt.permissionLock.RLock()
defer mt.permissionLock.RUnlock()
return mt.catalog.ListUserRole(mt.ctx, tenant)
}