Delete the user-role mapping info when deleting the user (#25988)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2023-08-04 18:37:08 +08:00 committed by GitHub
parent 775f03ed5d
commit d2649b63db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 853 additions and 250 deletions

View File

@ -72,6 +72,8 @@ class MilvusConan(ConanFile):
"glog:shared": True,
"prometheus-cpp:with_pull": False,
"fmt:header_only": True,
"onnetbb:tbbmalloc": False,
"onetbb:tbbproxy": False,
}
def configure(self):

View File

@ -534,9 +534,24 @@ func (kc *Catalog) DropPartition(ctx context.Context, dbID int64, collectionID t
func (kc *Catalog) DropCredential(ctx context.Context, username string) error {
k := fmt.Sprintf("%s/%s", CredentialPrefix, username)
err := kc.Txn.Remove(k)
userResults, err := kc.ListUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: username}, true)
if err != nil && !common.IsKeyNotExistError(err) {
log.Warn("fail to list user", zap.String("key", k), zap.Error(err))
return err
}
deleteKeys := make([]string, 0, len(userResults)+1)
deleteKeys = append(deleteKeys, k)
for _, userResult := range userResults {
if userResult.User.Name == username {
for _, role := range userResult.Roles {
userRoleKey := funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, util.DefaultTenant, fmt.Sprintf("%s/%s", username, role.Name))
deleteKeys = append(deleteKeys, userRoleKey)
}
}
}
err = kc.Txn.MultiRemove(deleteKeys)
if err != nil {
log.Error("drop credential update meta fail", zap.String("key", k), zap.Error(err))
log.Warn("fail to drop credential", zap.String("key", k), zap.Error(err))
return err
}
@ -729,9 +744,26 @@ func (kc *Catalog) CreateRole(ctx context.Context, tenant string, entity *milvus
func (kc *Catalog) DropRole(ctx context.Context, tenant string, roleName string) error {
k := funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, roleName)
err := kc.Txn.Remove(k)
roleResults, err := kc.ListRole(ctx, tenant, &milvuspb.RoleEntity{Name: roleName}, true)
if err != nil && !common.IsKeyNotExistError(err) {
log.Warn("fail to list role", zap.String("key", k), zap.Error(err))
return err
}
deleteKeys := make([]string, 0, len(roleResults)+1)
deleteKeys = append(deleteKeys, k)
for _, roleResult := range roleResults {
if roleResult.Role.Name == roleName {
for _, userInfo := range roleResult.Users {
userRoleKey := funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, fmt.Sprintf("%s/%s", userInfo.Name, roleName))
deleteKeys = append(deleteKeys, userRoleKey)
}
}
}
err = kc.Txn.MultiRemove(deleteKeys)
if err != nil {
log.Error("fail to drop role", zap.String("key", k), zap.Error(err))
log.Warn("fail to drop role", zap.String("key", k), zap.Error(err))
return err
}
return nil

View File

@ -10,11 +10,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -29,6 +24,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
var (
@ -1216,6 +1215,12 @@ func TestCatalog_DropCollection(t *testing.T) {
})
}
func getUserInfoMetaString(username string) string {
validInfo := internalpb.CredentialInfo{Username: username, EncryptedPassword: "pwd" + username}
validBytes, _ := json.Marshal(validInfo)
return string(validBytes)
}
func TestRBAC_Credential(t *testing.T) {
ctx := context.TODO()
@ -1325,12 +1330,34 @@ func TestRBAC_Credential(t *testing.T) {
kvmock = mocks.NewTxnKV(t)
c = &Catalog{Txn: kvmock}
dropFailName = "drop-fail"
dropFailKey = fmt.Sprintf("%s/%s", CredentialPrefix, dropFailName)
validName = "user1"
validUserRoleKeyPrefix = funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, util.DefaultTenant, validName)
dropFailName = "drop-fail"
dropUserRoleKeyPrefix = funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, util.DefaultTenant, dropFailName)
getFailName = "get-fail"
)
kvmock.EXPECT().Remove(dropFailKey).Return(errors.New("Mock invalid remove"))
kvmock.EXPECT().Remove(mock.Anything).Return(nil)
kvmock.EXPECT().MultiRemove([]string{fmt.Sprintf("%s/%s", CredentialPrefix, dropFailName)}).Return(errors.New("Mock drop fail"))
kvmock.EXPECT().MultiRemove(
[]string{
fmt.Sprintf("%s/%s", CredentialPrefix, validName),
validUserRoleKeyPrefix + "/role1",
validUserRoleKeyPrefix + "/role2",
},
).Return(nil)
kvmock.EXPECT().MultiRemove(mock.Anything).Return(errors.New("Mock invalid multi remove"))
kvmock.EXPECT().Load(fmt.Sprintf("%s/%s", CredentialPrefix, getFailName)).Return("", errors.New("Mock invalid load"))
kvmock.EXPECT().Load(fmt.Sprintf("%s/%s", CredentialPrefix, validName)).Return(getUserInfoMetaString(validName), nil)
kvmock.EXPECT().Load(fmt.Sprintf("%s/%s", CredentialPrefix, dropFailName)).Return(getUserInfoMetaString(dropFailName), nil)
kvmock.EXPECT().LoadWithPrefix(validUserRoleKeyPrefix).Return(
[]string{validUserRoleKeyPrefix + "/role1", validUserRoleKeyPrefix + "/role2"},
[]string{"", ""},
nil,
)
kvmock.EXPECT().LoadWithPrefix(dropUserRoleKeyPrefix).Return([]string{}, []string{}, nil)
tests := []struct {
description string
@ -1338,8 +1365,8 @@ func TestRBAC_Credential(t *testing.T) {
user string
}{
{"valid user1", true, "user1"},
{"valid user2", true, "user2"},
{"valid user1", true, validName},
{"invalid user get-fail", false, getFailName},
{"invalid user drop-fail", false, dropFailName},
}
@ -1562,20 +1589,40 @@ func TestRBAC_Role(t *testing.T) {
kvmock = mocks.NewTxnKV(t)
c = &Catalog{Txn: kvmock}
errorName = "error"
errorPath = funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, errorName)
validName = "role1"
errorName = "error"
getFailName = "get-fail"
)
kvmock.EXPECT().Remove(errorPath).Return(errors.New("mock remove error")).Once()
kvmock.EXPECT().Remove(mock.Anything).Return(nil).Once()
kvmock.EXPECT().MultiRemove([]string{funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, errorName)}).Return(errors.New("remove error"))
kvmock.EXPECT().MultiRemove([]string{
funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, validName),
funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, fmt.Sprintf("%s/%s", "user1", validName)),
funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, fmt.Sprintf("%s/%s", "user2", validName)),
}).Return(nil)
kvmock.EXPECT().MultiRemove(mock.Anything).Return(errors.New("mock multi remove error"))
getRoleMappingKey := func(username, rolename string) string {
return funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, fmt.Sprintf("%s/%s", username, rolename))
}
kvmock.EXPECT().LoadWithPrefix(funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, "")).Return(
[]string{getRoleMappingKey("user1", validName), getRoleMappingKey("user2", validName), getRoleMappingKey("user3", "role3")},
[]string{},
nil,
)
kvmock.EXPECT().Load(funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, getFailName)).Return("", errors.New("mock load error"))
kvmock.EXPECT().Load(funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, validName)).Return("", nil)
kvmock.EXPECT().Load(funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, errorName)).Return("", nil)
tests := []struct {
description string
isValid bool
role string
role string
}{
{"valid role role1", true, "role1"},
{"valid role role1", true, validName},
{"fail to get role info", false, getFailName},
{"invalid role error", false, errorName},
}
@ -1649,6 +1696,7 @@ func TestRBAC_Role(t *testing.T) {
})
}
})
t.Run("test ListRole", func(t *testing.T) {
var (
loadWithPrefixReturn atomic.Bool

View File

@ -4276,8 +4276,7 @@ func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleReque
log := log.Ctx(ctx)
log.Debug("CreateRole",
zap.Any("req", req))
log.Debug("CreateRole", zap.Any("req", req))
if code, ok := node.checkHealthyAndReturnCode(); !ok {
return errorutil.UnhealthyStatus(code), nil
}
@ -4295,8 +4294,7 @@ func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleReque
result, err := node.rootCoord.CreateRole(ctx, req)
if err != nil {
log.Error("fail to create role",
zap.Error(err))
log.Warn("fail to create role", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
@ -4331,7 +4329,7 @@ func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest)
}
result, err := node.rootCoord.DropRole(ctx, req)
if err != nil {
log.Error("fail to drop role",
log.Warn("fail to drop role",
zap.String("role_name", req.RoleName),
zap.Error(err))
return &commonpb.Status{
@ -4348,8 +4346,7 @@ func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUse
log := log.Ctx(ctx)
log.Debug("OperateUserRole",
zap.Any("req", req))
log.Debug("OperateUserRole", zap.Any("req", req))
if code, ok := node.checkHealthyAndReturnCode(); !ok {
return errorutil.UnhealthyStatus(code), nil
}
@ -4368,8 +4365,7 @@ func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUse
result, err := node.rootCoord.OperateUserRole(ctx, req)
if err != nil {
logger.Error("fail to operate user role",
zap.Error(err))
log.Warn("fail to operate user role", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
@ -4402,8 +4398,7 @@ func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleReque
result, err := node.rootCoord.SelectRole(ctx, req)
if err != nil {
log.Error("fail to select role",
zap.Error(err))
log.Warn("fail to select role", zap.Error(err))
return &milvuspb.SelectRoleResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -4420,14 +4415,14 @@ func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserReque
log := log.Ctx(ctx)
log.Debug("SelectUser",
zap.Any("req", req))
log.Debug("SelectUser", zap.Any("req", req))
if code, ok := node.checkHealthyAndReturnCode(); !ok {
return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
}
if req.User != nil {
if err := ValidateUsername(req.User.Name); err != nil {
log.Warn("invalid username", zap.Error(err))
return &milvuspb.SelectUserResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalArgument,
@ -4439,8 +4434,7 @@ func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserReque
result, err := node.rootCoord.SelectUser(ctx, req)
if err != nil {
log.Error("fail to select user",
zap.Error(err))
log.Warn("fail to select user", zap.Error(err))
return &milvuspb.SelectUserResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -4502,16 +4496,16 @@ func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePr
}
curUser, err := GetCurUserFromContext(ctx)
if err != nil {
log.Warn("fail to get current user", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
Reason: "fail to get current user, please make sure the authorizationEnabled setting in the milvus.yaml is true",
}, nil
}
req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
result, err := node.rootCoord.OperatePrivilege(ctx, req)
if err != nil {
log.Error("fail to operate privilege",
zap.Error(err))
log.Warn("fail to operate privilege", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
@ -4569,8 +4563,7 @@ func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantReq
result, err := node.rootCoord.SelectGrant(ctx, req)
if err != nil {
log.Error("fail to select grant",
zap.Error(err))
log.Warn("fail to select grant", zap.Error(err))
return &milvuspb.SelectGrantResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -4590,7 +4583,7 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr
log.Debug("RefreshPrivilegeInfoCache",
zap.Any("req", req))
if code, ok := node.checkHealthyAndReturnCode(); !ok {
return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
if globalMetaCache != nil {

View File

@ -917,7 +917,10 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) {
m.mu.Lock()
defer m.mu.Unlock()
m.unsafeInitPolicyInfo(info, userRoles)
}
func (m *MetaCache) unsafeInitPolicyInfo(info []string, userRoles []string) {
m.privilegeInfos = util.StringSet(info)
for _, userRole := range userRoles {
user, role, err := funcutil.DecodeUserRoleCache(userRole)
@ -975,6 +978,21 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
if m.userToRoles[user] != nil {
delete(m.userToRoles[user], role)
}
case typeutil.CacheDeleteUser:
delete(m.userToRoles, op.OpKey)
case typeutil.CacheDropRole:
for user := range m.userToRoles {
delete(m.userToRoles[user], op.OpKey)
}
case typeutil.CacheRefresh:
resp, err := m.rootCoord.ListPolicy(context.Background(), &internalpb.ListPolicyRequest{})
if err != nil {
log.Error("fail to init meta cache", zap.Error(err))
return err
}
m.userToRoles = make(map[string]map[string]struct{})
m.privilegeInfos = make(map[string]struct{})
m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles)
default:
return fmt.Errorf("invalid opType, op_type: %d, op_key: %s", int(op.OpType), op.OpKey)
}

View File

@ -19,6 +19,7 @@ package proxy
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strconv"
@ -2322,7 +2323,8 @@ func TestProxy(t *testing.T) {
getCredentialReq.Username = "("
getResp, err = rootCoordClient.GetCredential(ctx, getCredentialReq)
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, getResp.Status.ErrorCode)
})
wg.Add(1)
@ -3677,6 +3679,72 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) {
assert.Equal(t, commonpb.ErrorCode_Success, opResp.ErrorCode)
})
wg.Add(1)
t.Run("User Role mapping info", func(t *testing.T) {
defer wg.Done()
ctx := context.Background()
username := fmt.Sprintf("user%d", rand.Int31())
roleName := fmt.Sprintf("role%d", rand.Int31())
{
createCredentialResp, err := proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{Username: username, Password: crypto.Base64Encode("userpwd")})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, createCredentialResp.ErrorCode)
createRoleResp, err := proxy.CreateRole(ctx, &milvuspb.CreateRoleRequest{Entity: &milvuspb.RoleEntity{Name: roleName}})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, createRoleResp.ErrorCode)
}
{
resp, err := proxy.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{Username: username, RoleName: roleName})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := proxy.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{Username: username, RoleName: "admin"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
selectUserResp, err := proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: &milvuspb.UserEntity{Name: username}, IncludeRoleInfo: true})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, selectUserResp.Status.ErrorCode)
assert.Equal(t, 2, len(selectUserResp.Results[0].Roles))
selectRoleResp, err := proxy.SelectRole(ctx, &milvuspb.SelectRoleRequest{Role: &milvuspb.RoleEntity{Name: roleName}, IncludeUserInfo: true})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, selectRoleResp.Status.ErrorCode)
assert.Equal(t, 1, len(selectRoleResp.Results[0].Users))
}
{
resp, err := proxy.DropRole(ctx, &milvuspb.DropRoleRequest{RoleName: roleName})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
selectUserResp, err := proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: &milvuspb.UserEntity{Name: username}, IncludeRoleInfo: true})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, selectUserResp.Status.ErrorCode)
assert.Equal(t, 1, len(selectUserResp.Results[0].Roles))
selectRoleResp, err := proxy.SelectRole(ctx, &milvuspb.SelectRoleRequest{Role: &milvuspb.RoleEntity{Name: roleName}, IncludeUserInfo: true})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, selectRoleResp.Status.ErrorCode)
assert.Equal(t, 0, len(selectRoleResp.Results))
}
{
resp, err := proxy.DeleteCredential(ctx, &milvuspb.DeleteCredentialRequest{Username: username})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
selectUserResp, err := proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: &milvuspb.UserEntity{Name: username}, IncludeRoleInfo: true})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, selectUserResp.Status.ErrorCode)
assert.Equal(t, 0, len(selectUserResp.Results))
}
})
wg.Wait()
}

View File

@ -1203,7 +1203,7 @@ func (mt *MetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) erro
results, err := mt.catalog.ListRole(mt.ctx, tenant, nil, false)
if err != nil {
logger.Error("fail to list roles", zap.Error(err))
log.Error("fail to list roles", zap.Error(err))
return err
}
if len(results) >= Params.ProxyCfg.MaxRoleNum.GetAsInt() {

View File

@ -102,6 +102,19 @@ func TestRbacCreateRole(t *testing.T) {
assert.Error(t, err)
})
}
{
mockCata := mocks.NewRootCoordCatalog(t)
mockCata.On("ListRole",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil, errors.New("error mock list role"))
mockMt := &MetaTable{catalog: mockCata}
err := mockMt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"})
assert.Error(t, err)
}
}
func TestRbacDropRole(t *testing.T) {

View File

@ -22,9 +22,6 @@ import (
"os"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/allocator"
@ -33,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/tso"
@ -45,6 +43,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
)
const (
@ -74,6 +74,21 @@ type mockMetaTable struct {
GetCollectionVirtualChannelsFunc func(colID int64) []string
AlterCollectionFunc func(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error
RenameCollectionFunc func(ctx context.Context, oldName string, newName string, ts Timestamp) error
AddCredentialFunc func(credInfo *internalpb.CredentialInfo) error
GetCredentialFunc func(username string) (*internalpb.CredentialInfo, error)
DeleteCredentialFunc func(username string) error
AlterCredentialFunc func(credInfo *internalpb.CredentialInfo) error
ListCredentialUsernamesFunc func() (*milvuspb.ListCredUsersResponse, error)
CreateRoleFunc func(tenant string, entity *milvuspb.RoleEntity) error
DropRoleFunc func(tenant string, roleName string) error
OperateUserRoleFunc func(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error
SelectRoleFunc func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error)
SelectUserFunc func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error)
OperatePrivilegeFunc func(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error
SelectGrantFunc func(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error)
DropGrantFunc func(tenant string, role *milvuspb.RoleEntity) error
ListPolicyFunc func(tenant string) ([]string, error)
ListUserRoleFunc func(tenant string) ([]string, error)
}
func (m mockMetaTable) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
@ -156,6 +171,66 @@ func (m mockMetaTable) GetCollectionVirtualChannels(colID int64) []string {
return m.GetCollectionVirtualChannelsFunc(colID)
}
func (m mockMetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
return m.AddCredentialFunc(credInfo)
}
func (m mockMetaTable) GetCredential(username string) (*internalpb.CredentialInfo, error) {
return m.GetCredentialFunc(username)
}
func (m mockMetaTable) DeleteCredential(username string) error {
return m.DeleteCredentialFunc(username)
}
func (m mockMetaTable) AlterCredential(credInfo *internalpb.CredentialInfo) error {
return m.AlterCredentialFunc(credInfo)
}
func (m mockMetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) {
return m.ListCredentialUsernamesFunc()
}
func (m mockMetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) error {
return m.CreateRoleFunc(tenant, entity)
}
func (m mockMetaTable) DropRole(tenant string, roleName string) error {
return m.DropRoleFunc(tenant, roleName)
}
func (m mockMetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error {
return m.OperateUserRoleFunc(tenant, userEntity, roleEntity, operateType)
}
func (m mockMetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return m.SelectRoleFunc(tenant, entity, includeUserInfo)
}
func (m mockMetaTable) SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return m.SelectUserFunc(tenant, entity, includeRoleInfo)
}
func (m mockMetaTable) OperatePrivilege(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
return m.OperatePrivilegeFunc(tenant, entity, operateType)
}
func (m mockMetaTable) SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error) {
return m.SelectGrantFunc(tenant, entity)
}
func (m mockMetaTable) DropGrant(tenant string, role *milvuspb.RoleEntity) error {
return m.DropGrantFunc(tenant, role)
}
func (m mockMetaTable) ListPolicy(tenant string) ([]string, error) {
return m.ListPolicyFunc(tenant)
}
func (m mockMetaTable) ListUserRole(tenant string) ([]string, error) {
return m.ListUserRoleFunc(tenant)
}
func newMockMetaTable() *mockMetaTable {
return &mockMetaTable{}
}
@ -411,6 +486,51 @@ func withInvalidMeta() Opt {
meta.DropAliasFunc = func(ctx context.Context, dbName string, alias string, ts Timestamp) error {
return errors.New("error mock DropAlias")
}
meta.AddCredentialFunc = func(credInfo *internalpb.CredentialInfo) error {
return errors.New("error mock AddCredential")
}
meta.GetCredentialFunc = func(username string) (*internalpb.CredentialInfo, error) {
return nil, errors.New("error mock GetCredential")
}
meta.DeleteCredentialFunc = func(username string) error {
return errors.New("error mock DeleteCredential")
}
meta.AlterCredentialFunc = func(credInfo *internalpb.CredentialInfo) error {
return errors.New("error mock AlterCredential")
}
meta.ListCredentialUsernamesFunc = func() (*milvuspb.ListCredUsersResponse, error) {
return nil, errors.New("error mock ListCredentialUsernames")
}
meta.CreateRoleFunc = func(tenant string, entity *milvuspb.RoleEntity) error {
return errors.New("error mock CreateRole")
}
meta.DropRoleFunc = func(tenant string, roleName string) error {
return errors.New("error mock DropRole")
}
meta.OperateUserRoleFunc = func(tenant string, userEntity *milvuspb.UserEntity, roleEntity *milvuspb.RoleEntity, operateType milvuspb.OperateUserRoleType) error {
return errors.New("error mock OperateUserRole")
}
meta.SelectUserFunc = func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return nil, errors.New("error mock SelectUser")
}
meta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, errors.New("error mock SelectRole")
}
meta.OperatePrivilegeFunc = func(tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
return errors.New("error mock OperatePrivilege")
}
meta.SelectGrantFunc = func(tenant string, entity *milvuspb.GrantEntity) ([]*milvuspb.GrantEntity, error) {
return nil, errors.New("error mock SelectGrant")
}
meta.DropGrantFunc = func(tenant string, role *milvuspb.RoleEntity) error {
return errors.New("error mock DropGrant")
}
meta.ListPolicyFunc = func(tenant string) ([]string, error) {
return nil, errors.New("error mock ListPolicy")
}
meta.ListUserRoleFunc = func(tenant string) ([]string, error) {
return nil, errors.New("error mock ListUserRole")
}
return withMeta(meta)
}

View File

@ -639,6 +639,18 @@ func (c *Core) startInternal() error {
c.startServerLoop()
c.UpdateStateCode(commonpb.StateCode_Healthy)
// refresh rbac cache
if err := retry.Do(c.ctx, func() error {
if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: int32(typeutil.CacheRefresh),
}); err != nil {
log.Warn("fail to refresh policy info cache", zap.Error(err))
return err
}
return nil
}, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
log.Panic("fail to refresh policy info cache", zap.Error(err))
}
logutil.Logger(c.ctx).Info("rootcoord startup successfully")
return nil
@ -2012,25 +2024,23 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
method := "CreateCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
// insert to db
err := c.meta.AddCredential(credInfo)
if err != nil {
log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
ctxLog.Warn("CreateCredential save credential failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_CreateCredentialFailure
return status, nil
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateCredentialFailure), nil
}
// update proxy's local cache
err = c.UpdateCredCache(ctx, credInfo)
if err != nil {
log.Warn("CreateCredential add cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
ctxLog.Warn("CreateCredential add cache failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
}
log.Debug("CreateCredential success", zap.String("role", typeutil.RootCoordRole),
@ -2047,23 +2057,21 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
method := "GetCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("GetCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &rootcoordpb.GetCredentialResponse{Status: merr.Status(merr.WrapErrServiceNotReady(code.String()))}, nil
}
credInfo, err := c.meta.GetCredential(in.Username)
if err != nil {
log.Error("GetCredential query credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
ctxLog.Warn("GetCredential query credential failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_GetCredentialFailure
return &rootcoordpb.GetCredentialResponse{
Status: status,
}, err
Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_GetCredentialFailure),
}, nil
}
log.Debug("GetCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
ctxLog.Debug("GetCredential success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -2079,32 +2087,26 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
method := "UpdateCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
// update data on storage
err := c.meta.AlterCredential(credInfo)
if err != nil {
log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
ctxLog.Warn("UpdateCredential save credential failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure
return status, nil
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
}
// update proxy's local cache
err = c.UpdateCredCache(ctx, credInfo)
if err != nil {
log.Error("UpdateCredential update cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
ctxLog.Warn("UpdateCredential update cache failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure
return status, nil
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
}
log.Debug("UpdateCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
log.Debug("UpdateCredential success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -2116,36 +2118,58 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
method := "DeleteCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
var status *commonpb.Status
defer func() {
if status.Code != 0 {
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
}
}()
// delete data on storage
err := c.meta.DeleteCredential(in.Username)
redoTask := newBaseRedoTask(c.stepExecutor)
redoTask.AddSyncStep(NewSimpleStep("delete credential meta data", func(ctx context.Context) ([]nestedStep, error) {
err := c.meta.DeleteCredential(in.Username)
if err != nil {
ctxLog.Warn("delete credential meta data failed", zap.Error(err))
}
return nil, err
}))
redoTask.AddAsyncStep(NewSimpleStep("delete credential cache", func(ctx context.Context) ([]nestedStep, error) {
err := c.ExpireCredCache(ctx, in.Username)
if err != nil {
ctxLog.Warn("delete credential cache failed", zap.Error(err))
}
return nil, err
}))
redoTask.AddAsyncStep(NewSimpleStep("delete user role cache for the user", func(ctx context.Context) ([]nestedStep, error) {
err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: int32(typeutil.CacheDeleteUser),
OpKey: in.Username,
})
if err != nil {
ctxLog.Warn("delete user role cache failed for the user", zap.Error(err))
}
return nil, err
}))
err := redoTask.Execute(ctx)
if err != nil {
log.Error("DeleteCredential remove credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure
errMsg := "fail to execute task when deleting the user"
ctxLog.Warn(errMsg, zap.Error(err))
status = merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DeleteCredentialFailure)
return status, nil
}
// invalidate proxy's local cache
err = c.ExpireCredCache(ctx, in.Username)
if err != nil {
log.Error("DeleteCredential expire credential cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure
return status, nil
}
log.Debug("DeleteCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
ctxLog.Debug("DeleteCredential success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCredentials.Dec()
return merr.Status(nil), nil
status = merr.Status(nil)
return status, nil
}
// ListCredUsers list all usernames
@ -2153,19 +2177,22 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ
method := "ListCredUsers"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.ListCredUsersResponse{Status: merr.Status(merr.WrapErrServiceNotReady(code.String()))}, nil
}
credInfo, err := c.meta.ListCredentialUsernames()
if err != nil {
log.Ctx(ctx).Error("ListCredUsers query usernames failed",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err))
ctxLog.Warn("ListCredUsers query usernames failed", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_ListCredUsersFailure
return &milvuspb.ListCredUsersResponse{Status: status}, nil
}
log.Debug("ListCredUsers success", zap.String("role", typeutil.RootCoordRole))
ctxLog.Debug("ListCredUsers success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -2184,21 +2211,22 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (
method := "CreateRole"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method + " begin")
if code, ok := c.checkHealthy(); !ok {
return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
entity := in.Entity
err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name})
if err != nil {
errMsg := "fail to create role"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_CreateRoleFailure, fmt.Sprintf("%s, error: %s", errMsg, err.Error())), nil
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateRoleFailure), nil
}
logger.Debug(method+" success", zap.String("role_name", entity.Name))
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfRoles.Inc()
@ -2217,15 +2245,16 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com
method := "DropRole"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("role_name", in.RoleName))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
errMsg := "not found the role, maybe the role isn't existed or internal system error"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_DropRoleFailure, errMsg), nil
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
}
grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, &milvuspb.GrantEntity{
@ -2233,42 +2262,35 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com
})
if len(grantEntities) != 0 {
errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_DropRoleFailure, errMsg), nil
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
}
roleResults, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, true)
if err != nil {
errMsg := "fail to find the role by role name, maybe the role isn't existed or internal system error"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_DropRoleFailure, errMsg), nil
}
logger.Debug("role to user info", zap.Int("counter", len(roleResults)))
for _, roleResult := range roleResults {
for index, userEntity := range roleResult.Users {
if err = c.meta.OperateUserRole(util.DefaultTenant,
&milvuspb.UserEntity{Name: userEntity.Name},
&milvuspb.RoleEntity{Name: roleResult.Role.Name}, milvuspb.OperateUserRoleType_RemoveUserFromRole); err != nil {
if common.IsIgnorableError(err) {
continue
}
errMsg := "fail to remove user from role"
log.Error(errMsg, zap.Any("in", in), zap.String("role_name", roleResult.Role.Name), zap.String("username", userEntity.Name), zap.Int("current_index", index), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
}
redoTask := newBaseRedoTask(c.stepExecutor)
redoTask.AddSyncStep(NewSimpleStep("drop role meta data", func(ctx context.Context) ([]nestedStep, error) {
err := c.meta.DropRole(util.DefaultTenant, in.RoleName)
if err != nil {
ctxLog.Warn("drop role mata data failed", zap.Error(err))
}
}
if err = c.meta.DropGrant(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}); err != nil {
errMsg := "fail to drop the grant"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_DropRoleFailure, errMsg), nil
}
if err = c.meta.DropRole(util.DefaultTenant, in.RoleName); err != nil {
errMsg := "fail to drop the role"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_DropRoleFailure, errMsg), nil
return nil, err
}))
redoTask.AddAsyncStep(NewSimpleStep("drop role cache", func(ctx context.Context) ([]nestedStep, error) {
err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: int32(typeutil.CacheDropRole),
OpKey: in.RoleName,
})
if err != nil {
ctxLog.Warn("delete user role cache failed for the role", zap.Error(err))
}
return nil, err
}))
err = redoTask.Execute(ctx)
if err != nil {
errMsg := "fail to execute task when dropping the role"
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
}
logger.Debug(method+" success", zap.String("role_name", in.RoleName))
ctxLog.Debug(method+" success", zap.String("role_name", in.RoleName))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfRoles.Dec()
@ -2285,36 +2307,36 @@ func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRole
method := "OperateUserRole-" + in.Type.String()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
errMsg := "not found the role, maybe the role isn't existed or internal system error"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
}
if in.Type != milvuspb.OperateUserRoleType_RemoveUserFromRole {
if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, false); err != nil {
errMsg := "not found the user, maybe the user isn't existed or internal system error"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
ctxLog.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
}
}
updateCache := true
if err := c.meta.OperateUserRole(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, &milvuspb.RoleEntity{Name: in.RoleName}, in.Type); err != nil {
if !common.IsIgnorableError(err) {
errMsg := "fail to operate user to role"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
redoTask := newBaseRedoTask(c.stepExecutor)
redoTask.AddSyncStep(NewSimpleStep("operate user role meta data", func(ctx context.Context) ([]nestedStep, error) {
err := c.meta.OperateUserRole(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, &milvuspb.RoleEntity{Name: in.RoleName}, in.Type)
if err != nil && !common.IsIgnorableError(err) {
log.Warn("operate user role mata data failed", zap.Error(err))
return nil, err
}
updateCache = false
}
if updateCache {
return nil, nil
}))
redoTask.AddAsyncStep(NewSimpleStep("operate user role cache", func(ctx context.Context) ([]nestedStep, error) {
var opType int32
switch in.Type {
case milvuspb.OperateUserRoleType_AddUserToRole:
@ -2323,20 +2345,26 @@ func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRole
opType = int32(typeutil.CacheRemoveUserFromRole)
default:
errMsg := "invalid operate type for the OperateUserRole api"
log.Error(errMsg, zap.Any("in", in))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
log.Warn(errMsg, zap.Any("in", in))
return nil, nil
}
if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: opType,
OpKey: funcutil.EncodeUserRoleCache(in.Username, in.RoleName),
}); err != nil {
errMsg := "fail to refresh policy info cache"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperateUserRoleFailure, errMsg), nil
log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
return nil, err
}
return nil, nil
}))
err := redoTask.Execute(ctx)
if err != nil {
errMsg := "fail to execute task when operate the user and role"
log.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return merr.Status(nil), nil
@ -2350,10 +2378,11 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (
method := "SelectRole"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, errorutil.UnhealthyError()
return &milvuspb.SelectRoleResponse{Status: merr.Status(merr.WrapErrServiceNotReady(code.String()))}, nil
}
if in.Role != nil {
@ -2364,22 +2393,22 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (
}, nil
}
errMsg := "fail to select the role to check the role name"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Error(err))
return &milvuspb.SelectRoleResponse{
Status: failStatus(commonpb.ErrorCode_SelectRoleFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
}, nil
}
}
roleResults, err := c.meta.SelectRole(util.DefaultTenant, in.Role, in.IncludeUserInfo)
if err != nil {
errMsg := "fail to select the role"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Error(err))
return &milvuspb.SelectRoleResponse{
Status: failStatus(commonpb.ErrorCode_SelectRoleFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
}, nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.SelectRoleResponse{
@ -2396,10 +2425,11 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (
method := "SelectUser"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, errorutil.UnhealthyError()
return &milvuspb.SelectUserResponse{Status: merr.Status(merr.WrapErrServiceNotReady(code.String()))}, nil
}
if in.User != nil {
@ -2410,22 +2440,22 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (
}, nil
}
errMsg := "fail to select the user to check the username"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
return &milvuspb.SelectUserResponse{
Status: failStatus(commonpb.ErrorCode_SelectUserFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
}, nil
}
}
userResults, err := c.meta.SelectUser(util.DefaultTenant, in.User, in.IncludeRoleInfo)
if err != nil {
errMsg := "fail to select the user"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Error(err))
return &milvuspb.SelectUserResponse{
Status: failStatus(commonpb.ErrorCode_SelectUserFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
}, nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.SelectUserResponse{
@ -2504,53 +2534,54 @@ func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivile
method := "OperatePrivilege"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
}
if in.Type != milvuspb.OperatePrivilegeType_Grant && in.Type != milvuspb.OperatePrivilegeType_Revoke {
errMsg := fmt.Sprintf("invalid operate privilege type, current type: %s, valid value: [%s, %s]", in.Type, milvuspb.OperatePrivilegeType_Grant, milvuspb.OperatePrivilegeType_Revoke)
log.Error(errMsg, zap.Any("in", in))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, errMsg), nil
ctxLog.Warn(errMsg)
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
if in.Entity == nil {
errMsg := "the grant entity in the request is nil"
log.Error(errMsg, zap.Any("in", in))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, errMsg), nil
ctxLog.Error(errMsg)
return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
if err := c.isValidObject(in.Entity.Object); err != nil {
log.Error("", zap.Error(err))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, err.Error()), nil
ctxLog.Warn("", zap.Error(err))
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
if err := c.isValidRole(in.Entity.Role); err != nil {
log.Error("", zap.Error(err))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, err.Error()), nil
ctxLog.Warn("", zap.Error(err))
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
if err := c.isValidGrantor(in.Entity.Grantor, in.Entity.Object.Name); err != nil {
log.Error("", zap.Error(err))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, err.Error()), nil
ctxLog.Error("", zap.Error(err))
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
logger.Debug("before PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
ctxLog.Debug("before PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
if !util.IsAnyWord(in.Entity.Grantor.Privilege.Name) {
in.Entity.Grantor.Privilege.Name = util.PrivilegeNameForMetastore(in.Entity.Grantor.Privilege.Name)
}
logger.Debug("after PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
ctxLog.Debug("after PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
if in.Entity.Object.Name == commonpb.ObjectType_Global.String() {
in.Entity.ObjectName = util.AnyWord
}
updateCache := true
if err := c.meta.OperatePrivilege(util.DefaultTenant, in.Entity, in.Type); err != nil {
if !common.IsIgnorableError(err) {
errMsg := "fail to operate the privilege"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, errMsg), nil
}
updateCache = false
}
if updateCache {
redoTask := newBaseRedoTask(c.stepExecutor)
redoTask.AddSyncStep(NewSimpleStep("operate privilege meta data", func(ctx context.Context) ([]nestedStep, error) {
err := c.meta.OperatePrivilege(util.DefaultTenant, in.Entity, in.Type)
if err != nil && !common.IsIgnorableError(err) {
log.Warn("fail to operate the privilege", zap.Any("in", in), zap.Error(err))
return nil, err
}
return nil, nil
}))
redoTask.AddAsyncStep(NewSimpleStep("operate privilege cache", func(ctx context.Context) ([]nestedStep, error) {
var opType int32
switch in.Type {
case milvuspb.OperatePrivilegeType_Grant:
@ -2558,21 +2589,27 @@ func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivile
case milvuspb.OperatePrivilegeType_Revoke:
opType = int32(typeutil.CacheRevokePrivilege)
default:
errMsg := "invalid operate type for the OperatePrivilege api"
log.Error(errMsg, zap.Any("in", in))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, errMsg), nil
log.Warn("invalid operate type for the OperatePrivilege api", zap.Any("in", in))
return nil, nil
}
if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: opType,
OpKey: funcutil.PolicyForPrivilege(in.Entity.Role.Name, in.Entity.Object.Name, in.Entity.ObjectName, in.Entity.Grantor.Privilege.Name, in.Entity.DbName),
}); err != nil {
errMsg := "fail to refresh policy info cache"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
return failStatus(commonpb.ErrorCode_OperatePrivilegeFailure, errMsg), nil
log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
return nil, err
}
return nil, nil
}))
err := redoTask.Execute(ctx)
if err != nil {
errMsg := "fail to execute task when operating the privilege"
log.Warn(errMsg, zap.Error(err))
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return merr.Status(nil), nil
@ -2587,31 +2624,32 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest)
method := "SelectGrant"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.SelectGrantResponse{
Status: errorutil.UnhealthyStatus(code),
}, errorutil.UnhealthyError()
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
}, nil
}
if in.Entity == nil {
errMsg := "the grant entity in the request is nil"
log.Error(errMsg, zap.Any("in", in))
ctxLog.Warn(errMsg)
return &milvuspb.SelectGrantResponse{
Status: failStatus(commonpb.ErrorCode_SelectGrantFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
}, nil
}
if err := c.isValidRole(in.Entity.Role); err != nil {
log.Error("", zap.Any("in", in), zap.Error(err))
ctxLog.Warn("", zap.Error(err))
return &milvuspb.SelectGrantResponse{
Status: failStatus(commonpb.ErrorCode_SelectGrantFailure, err.Error()),
Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
}, nil
}
if in.Entity.Object != nil {
if err := c.isValidObject(in.Entity.Object); err != nil {
log.Error("", zap.Any("in", in), zap.Error(err))
ctxLog.Warn("", zap.Error(err))
return &milvuspb.SelectGrantResponse{
Status: failStatus(commonpb.ErrorCode_SelectGrantFailure, err.Error()),
Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
}, nil
}
}
@ -2624,13 +2662,13 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest)
}
if err != nil {
errMsg := "fail to select the grant"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Error(err))
return &milvuspb.SelectGrantResponse{
Status: failStatus(commonpb.ErrorCode_SelectGrantFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
}, nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.SelectGrantResponse{
@ -2643,32 +2681,33 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest)
method := "PolicyList"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
logger.Debug(method, zap.Any("in", in))
ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
ctxLog.Debug(method)
if code, ok := c.checkHealthy(); !ok {
return &internalpb.ListPolicyResponse{
Status: errorutil.UnhealthyStatus(code),
}, errorutil.UnhealthyError()
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
}, nil
}
policies, err := c.meta.ListPolicy(util.DefaultTenant)
if err != nil {
errMsg := "fail to list policy"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Error(err))
return &internalpb.ListPolicyResponse{
Status: failStatus(commonpb.ErrorCode_ListPolicyFailure, errMsg),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
}, nil
}
userRoles, err := c.meta.ListUserRole(util.DefaultTenant)
if err != nil {
errMsg := "fail to list user-role"
log.Error(errMsg, zap.Any("in", in), zap.Error(err))
ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
return &internalpb.ListPolicyResponse{
Status: failStatus(commonpb.ErrorCode_ListPolicyFailure, "fail to list user-role"),
Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
}, nil
}
logger.Debug(method + " success")
ctxLog.Debug(method + " success")
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &internalpb.ListPolicyResponse{

View File

@ -1455,51 +1455,81 @@ func TestCore_Rbac(t *testing.T) {
// not healthy.
c.stateCode.Store(commonpb.StateCode_Abnormal)
{
resp, err := c.CreateCredential(ctx, &internalpb.CredentialInfo{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.ErrorCode)
}
{
resp, err := c.DeleteCredential(ctx, &milvuspb.DeleteCredentialRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.ErrorCode)
}
{
resp, err := c.UpdateCredential(ctx, &internalpb.CredentialInfo{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.ErrorCode)
}
{
resp, err := c.GetCredential(ctx, &rootcoordpb.GetCredentialRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode)
}
{
resp, err := c.ListCredUsers(ctx, &milvuspb.ListCredUsersRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode)
}
{
resp, err := c.CreateRole(ctx, &milvuspb.CreateRoleRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.DropRole(ctx, &milvuspb.DropRoleRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.SelectRole(ctx, &milvuspb.SelectRoleRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.SelectUser(ctx, &milvuspb.SelectUserRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
assert.Error(t, err)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
}
@ -1737,6 +1767,196 @@ func TestRootCoord_CheckHealth(t *testing.T) {
})
}
func TestRootCoord_RBACError(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode(), withInvalidMeta())
t.Run("create credential failed", func(t *testing.T) {
resp, err := c.CreateCredential(ctx, &internalpb.CredentialInfo{Username: "foo", EncryptedPassword: "bar"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("get credential failed", func(t *testing.T) {
resp, err := c.GetCredential(ctx, &rootcoordpb.GetCredentialRequest{Username: "foo"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("update credential failed", func(t *testing.T) {
resp, err := c.UpdateCredential(ctx, &internalpb.CredentialInfo{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("delete credential failed", func(t *testing.T) {
resp, err := c.DeleteCredential(ctx, &milvuspb.DeleteCredentialRequest{Username: "foo"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("list credential failed", func(t *testing.T) {
resp, err := c.ListCredUsers(ctx, &milvuspb.ListCredUsersRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("create role failed", func(t *testing.T) {
resp, err := c.CreateRole(ctx, &milvuspb.CreateRoleRequest{Entity: &milvuspb.RoleEntity{Name: "foo"}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("drop role failed", func(t *testing.T) {
resp, err := c.DropRole(ctx, &milvuspb.DropRoleRequest{RoleName: "foo"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("operate user role failed", func(t *testing.T) {
mockMeta := c.meta.(*mockMetaTable)
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, nil
}
mockMeta.SelectUserFunc = func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return nil, nil
}
resp, err := c.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{RoleName: "foo", Username: "bar", Type: milvuspb.OperateUserRoleType_AddUserToRole})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, errors.New("mock error")
}
mockMeta.SelectUserFunc = func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return nil, errors.New("mock error")
}
})
t.Run("select role failed", func(t *testing.T) {
{
resp, err := c.SelectRole(ctx, &milvuspb.SelectRoleRequest{Role: &milvuspb.RoleEntity{Name: "foo"}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.SelectRole(ctx, &milvuspb.SelectRoleRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
})
t.Run("select user failed", func(t *testing.T) {
{
resp, err := c.SelectUser(ctx, &milvuspb.SelectUserRequest{User: &milvuspb.UserEntity{Name: "foo"}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.SelectUser(ctx, &milvuspb.SelectUserRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
})
t.Run("operate privilege failed", func(t *testing.T) {
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Type: milvuspb.OperatePrivilegeType(100)})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Type: milvuspb.OperatePrivilegeType_Grant})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Entity: &milvuspb.GrantEntity{Object: &milvuspb.ObjectEntity{Name: "CollectionErr"}}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Entity: &milvuspb.GrantEntity{Object: &milvuspb.ObjectEntity{Name: "Collection"}, Role: &milvuspb.RoleEntity{Name: "foo"}}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
mockMeta := c.meta.(*mockMetaTable)
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, nil
}
{
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Entity: &milvuspb.GrantEntity{
Role: &milvuspb.RoleEntity{Name: "foo"},
Object: &milvuspb.ObjectEntity{Name: "Collection"},
ObjectName: "col1",
Grantor: &milvuspb.GrantorEntity{
User: &milvuspb.UserEntity{Name: "root"},
Privilege: &milvuspb.PrivilegeEntity{Name: "Insert"},
},
}, Type: milvuspb.OperatePrivilegeType_Grant})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}
mockMeta.SelectUserFunc = func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return nil, nil
}
resp, err := c.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{Entity: &milvuspb.GrantEntity{
Role: &milvuspb.RoleEntity{Name: "foo"},
Object: &milvuspb.ObjectEntity{Name: "Collection"},
ObjectName: "col1",
Grantor: &milvuspb.GrantorEntity{
User: &milvuspb.UserEntity{Name: "root"},
Privilege: &milvuspb.PrivilegeEntity{Name: "Insert"},
},
}, Type: milvuspb.OperatePrivilegeType_Grant})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, errors.New("mock error")
}
mockMeta.SelectUserFunc = func(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
return nil, errors.New("mock error")
}
})
t.Run("select grant failed", func(t *testing.T) {
{
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{Entity: &milvuspb.GrantEntity{Role: &milvuspb.RoleEntity{Name: "foo"}}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
mockMeta := c.meta.(*mockMetaTable)
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, nil
}
{
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{Entity: &milvuspb.GrantEntity{Role: &milvuspb.RoleEntity{Name: "foo"}, Object: &milvuspb.ObjectEntity{Name: "CollectionFoo"}}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
{
resp, err := c.SelectGrant(ctx, &milvuspb.SelectGrantRequest{Entity: &milvuspb.GrantEntity{Role: &milvuspb.RoleEntity{Name: "foo"}, Object: &milvuspb.ObjectEntity{Name: "Collection"}}})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
mockMeta.SelectRoleFunc = func(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
return nil, errors.New("mock error")
}
})
t.Run("list policy failed", func(t *testing.T) {
resp, err := c.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
mockMeta := c.meta.(*mockMetaTable)
mockMeta.ListPolicyFunc = func(tenant string) ([]string, error) {
return []string{}, nil
}
resp, err = c.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
mockMeta.ListPolicyFunc = func(tenant string) ([]string, error) {
return []string{}, errors.New("mock error")
}
})
}
func TestCore_Stop(t *testing.T) {
t.Run("abnormal stop before component is ready", func(t *testing.T) {
c := &Core{}

View File

@ -476,3 +476,29 @@ func (b *confirmGCStep) Desc() string {
func (b *confirmGCStep) Weight() stepPriority {
return stepPriorityLow
}
type simpleStep struct {
desc string
weight stepPriority
executeFunc func(ctx context.Context) ([]nestedStep, error)
}
func NewSimpleStep(desc string, executeFunc func(ctx context.Context) ([]nestedStep, error)) nestedStep {
return &simpleStep{
desc: desc,
weight: stepPriorityNormal,
executeFunc: executeFunc,
}
}
func (s *simpleStep) Execute(ctx context.Context) ([]nestedStep, error) {
return s.executeFunc(ctx)
}
func (s *simpleStep) Desc() string {
return s.desc
}
func (s *simpleStep) Weight() stepPriority {
return s.weight
}

View File

@ -31,8 +31,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var logger = log.L().WithOptions(zap.Fields(zap.String("role", typeutil.RootCoordRole)))
// EqualKeyPairArray check whether 2 KeyValuePairs are equal
func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair) bool {
if len(p1) != len(p2) {
@ -107,6 +105,7 @@ func CheckMsgType(got, expect commonpb.MsgType) error {
return nil
}
// Deprecated: use merr.StatusWithErrorCode or merr.Status instead
func failStatus(code commonpb.ErrorCode, reason string) *commonpb.Status {
return &commonpb.Status{
ErrorCode: code,

View File

@ -58,6 +58,16 @@ func (s *ErrSuite) TestStatus() {
s.Nil(Error(&commonpb.Status{}))
}
func (s *ErrSuite) TestStatusWithCode() {
err := WrapErrCollectionNotFound(1)
status := StatusWithErrorCode(err, commonpb.ErrorCode_CollectionNotExists)
restoredErr := Error(status)
s.ErrorIs(err, restoredErr)
s.Equal(commonpb.ErrorCode_CollectionNotExists, status.ErrorCode)
s.Equal(int32(0), StatusWithErrorCode(nil, commonpb.ErrorCode_CollectionNotExists).Code)
}
func (s *ErrSuite) TestWrap() {
// Service related
s.ErrorIs(WrapErrServiceNotReady("init", "test init..."), ErrServiceNotReady)

View File

@ -76,6 +76,18 @@ func Status(err error) *commonpb.Status {
}
}
func StatusWithErrorCode(err error, code commonpb.ErrorCode) *commonpb.Status {
if err == nil {
return &commonpb.Status{}
}
return &commonpb.Status{
Code: Code(err),
Reason: err.Error(),
ErrorCode: code,
}
}
func oldCode(code int32) commonpb.ErrorCode {
switch code {
case ErrServiceNotReady.code():

View File

@ -328,7 +328,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int32(10240), Params.MaxReceiveChanSize.GetAsInt32())
assert.Equal(t, int32(10240), Params.MaxUnsolvedQueueSize.GetAsInt32())
assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat())
assert.Equal(t, uint32(runtime.GOMAXPROCS(0)*4), Params.KnowhereThreadPoolSize.GetAsUint32())
assert.Equal(t, uint32(runtime.GOMAXPROCS(0)), Params.KnowhereThreadPoolSize.GetAsUint32())
// test small indexNlist/NProbe default
params.Remove("queryNode.segcore.smallIndex.nlist")

View File

@ -7,6 +7,9 @@ const (
CacheRemoveUserFromRole
CacheGrantPrivilege
CacheRevokePrivilege
CacheDeleteUser
CacheDropRole
CacheRefresh
)
type CacheOp struct {