From 8b706122a86fb32f54c6c4596046c870d50e5197 Mon Sep 17 00:00:00 2001 From: SimFG Date: Sat, 31 Aug 2024 21:57:02 +0800 Subject: [PATCH] enhance: [2.4] support to drop the role which is related the privilege list (#35863) - issue: #35545 - pr: #35727 Signed-off-by: SimFG --- .gitignore | 1 + go.mod | 2 +- internal/datanode/compaction/mix_compactor.go | 7 +- .../datanode/compaction/segment_writer.go | 2 +- internal/metastore/kv/rootcoord/kv_catalog.go | 20 +- .../metastore/kv/rootcoord/kv_catalog_test.go | 15 +- internal/proxy/meta_cache.go | 6 + internal/proxy/meta_cache_test.go | 10 +- internal/rootcoord/root_coord.go | 28 ++- .../impls/pulsar/message_id_data.pb.go | 177 ------------------ .../impls/pulsar/message_id_data.proto | 13 -- pkg/util/funcutil/policy.go | 4 + pkg/util/funcutil/policy_test.go | 7 + 13 files changed, 79 insertions(+), 213 deletions(-) delete mode 100644 pkg/streaming/walimpls/impls/pulsar/message_id_data.pb.go delete mode 100644 pkg/streaming/walimpls/impls/pulsar/message_id_data.proto diff --git a/.gitignore b/.gitignore index b6adfcbdb4..e96dea2775 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,4 @@ internal/proto/**/*.pb.go internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.cc **/legacypb/*.pb.go +*.pb.go diff --git a/go.mod b/go.mod index 6bf19d3cea..883a1429c0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/protobuf v1.5.4 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index cdb600877d..339b0a6b08 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -24,6 +24,10 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" + "go.opentelemetry.io/otel" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -35,9 +39,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" - "github.com/samber/lo" - "go.opentelemetry.io/otel" - "go.uber.org/zap" ) type mixCompactionTask struct { diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index 81d8789746..414220a086 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -8,6 +8,7 @@ import ( "context" "math" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -21,7 +22,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" - "github.com/samber/lo" ) // Not concurrent safe. diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index e99b461d11..28705ff896 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -1167,11 +1167,25 @@ func (kc *Catalog) ListGrant(ctx context.Context, tenant string, entity *milvusp func (kc *Catalog) DeleteGrant(ctx context.Context, tenant string, role *milvuspb.RoleEntity) error { var ( - k = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, role.Name+"/") - err error + k = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, role.Name+"/") + err error + removeKeys []string ) - if err = kc.Txn.RemoveWithPrefix(k); err != nil { + removeKeys = append(removeKeys, k) + + // the values are the grantee id list + _, values, err := kc.Txn.LoadWithPrefix(k) + if err != nil { + log.Warn("fail to load grant privilege entities", zap.String("key", k), zap.Error(err)) + return err + } + for _, v := range values { + granteeIDKey := funcutil.HandleTenantForEtcdKey(GranteeIDPrefix, tenant, v+"/") + removeKeys = append(removeKeys, granteeIDKey) + } + + if err = kc.Txn.MultiSaveAndRemoveWithPrefix(nil, removeKeys); err != nil { log.Error("fail to remove with the prefix", zap.String("key", k), zap.Error(err)) } return err diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index b56a5b7801..71817ae109 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -2311,12 +2311,18 @@ func TestRBAC_Grant(t *testing.T) { kvmock = mocks.NewTxnKV(t) c = &Catalog{Txn: kvmock} - errorRole = "error-role" - errorRolePrefix = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, errorRole+"/") + errorRole = "error-role" + errorRolePrefix = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, errorRole+"/") + loadErrorRole = "load-error-role" + loadErrorRolePrefix = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, loadErrorRole+"/") + granteeID = "123456" + granteePrefix = funcutil.HandleTenantForEtcdKey(GranteeIDPrefix, tenant, granteeID+"/") ) - kvmock.EXPECT().RemoveWithPrefix(errorRolePrefix).Call.Return(errors.New("mock removeWithPrefix error")) - kvmock.EXPECT().RemoveWithPrefix(mock.Anything).Call.Return(nil) + kvmock.EXPECT().LoadWithPrefix(loadErrorRolePrefix).Call.Return(nil, nil, errors.New("mock loadWithPrefix error")) + kvmock.EXPECT().LoadWithPrefix(mock.Anything).Call.Return(nil, []string{granteeID}, nil) + kvmock.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, []string{errorRolePrefix, granteePrefix}, mock.Anything).Call.Return(errors.New("mock removeWithPrefix error")) + kvmock.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything, mock.Anything).Call.Return(nil) tests := []struct { isValid bool @@ -2326,6 +2332,7 @@ func TestRBAC_Grant(t *testing.T) { }{ {true, "role1", "valid role1"}, {false, errorRole, "invalid errorRole"}, + {false, loadErrorRole, "invalid load errorRole"}, } for _, test := range tests { diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 0e89271a75..4982ccd3a1 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -1221,6 +1221,12 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) { for user := range m.userToRoles { delete(m.userToRoles[user], op.OpKey) } + + for policy := range m.privilegeInfos { + if funcutil.PolicyCheckerWithRole(policy, op.OpKey) { + delete(m.privilegeInfos, policy) + } + } case typeutil.CacheRefresh: resp, err := m.rootCoord.ListPolicy(context.Background(), &internalpb.ListPolicyRequest{}) if err != nil { diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index ec8fe63c37..9ea940e920 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -691,9 +691,13 @@ func TestMetaCache_PolicyInfo(t *testing.T) { t.Run("Delete user or drop role", func(t *testing.T) { client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { return &internalpb.ListPolicyResponse{ - Status: merr.Success(), - PolicyInfos: []string{"policy1", "policy2", "policy3"}, - UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, + Status: merr.Success(), + PolicyInfos: []string{ + funcutil.PolicyForPrivilege("role2", "Collection", "collection1", "read", "default"), + "policy2", + "policy3", + }, + UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, }, nil } err := InitMetaCache(context.Background(), client, qc, mgr) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5b393eaeb4..9ab65d53c2 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2235,13 +2235,15 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil } - grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, &milvuspb.GrantEntity{ - Role: &milvuspb.RoleEntity{Name: in.RoleName}, - }) - if len(grantEntities) != 0 { - errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges" - ctxLog.Warn(errMsg, zap.Any("grants", grantEntities), zap.Error(err)) - return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil + if !in.ForceDrop { + grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, &milvuspb.GrantEntity{ + Role: &milvuspb.RoleEntity{Name: in.RoleName}, + }) + if len(grantEntities) != 0 { + errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges" + ctxLog.Warn(errMsg, zap.Any("grants", grantEntities), zap.Error(err)) + return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil + } } redoTask := newBaseRedoTask(c.stepExecutor) redoTask.AddSyncStep(NewSimpleStep("drop role meta data", func(ctx context.Context) ([]nestedStep, error) { @@ -2251,6 +2253,16 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com } return nil, err })) + redoTask.AddAsyncStep(NewSimpleStep("drop the privilege list of this role", func(ctx context.Context) ([]nestedStep, error) { + if !in.ForceDrop { + return nil, nil + } + err := c.meta.DropGrant(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}) + if err != nil { + ctxLog.Warn("drop the privilege list failed for the role", zap.Error(err)) + } + return nil, err + })) redoTask.AddAsyncStep(NewSimpleStep("drop role cache", func(ctx context.Context) ([]nestedStep, error) { err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{ OpType: int32(typeutil.CacheDropRole), @@ -2261,7 +2273,7 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com } return nil, err })) - err = redoTask.Execute(ctx) + err := redoTask.Execute(ctx) if err != nil { errMsg := "fail to execute task when dropping the role" ctxLog.Warn(errMsg, zap.Error(err)) diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id_data.pb.go b/pkg/streaming/walimpls/impls/pulsar/message_id_data.pb.go deleted file mode 100644 index b3e2df6cd8..0000000000 --- a/pkg/streaming/walimpls/impls/pulsar/message_id_data.pb.go +++ /dev/null @@ -1,177 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.33.0 -// protoc v3.6.1 -// source: message_id_data.proto - -package pulsar - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type MessageIdData struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - LedgerId *uint64 `protobuf:"varint,1,req,name=ledgerId" json:"ledgerId,omitempty"` - EntryId *uint64 `protobuf:"varint,2,req,name=entryId" json:"entryId,omitempty"` - Partition *int32 `protobuf:"varint,3,opt,name=partition" json:"partition,omitempty"` - BatchIndex *int32 `protobuf:"varint,4,opt,name=batch_index,json=batchIndex" json:"batch_index,omitempty"` -} - -func (x *MessageIdData) Reset() { - *x = MessageIdData{} - if protoimpl.UnsafeEnabled { - mi := &file_message_id_data_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MessageIdData) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MessageIdData) ProtoMessage() {} - -func (x *MessageIdData) ProtoReflect() protoreflect.Message { - mi := &file_message_id_data_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MessageIdData.ProtoReflect.Descriptor instead. -func (*MessageIdData) Descriptor() ([]byte, []int) { - return file_message_id_data_proto_rawDescGZIP(), []int{0} -} - -func (x *MessageIdData) GetLedgerId() uint64 { - if x != nil && x.LedgerId != nil { - return *x.LedgerId - } - return 0 -} - -func (x *MessageIdData) GetEntryId() uint64 { - if x != nil && x.EntryId != nil { - return *x.EntryId - } - return 0 -} - -func (x *MessageIdData) GetPartition() int32 { - if x != nil && x.Partition != nil { - return *x.Partition - } - return 0 -} - -func (x *MessageIdData) GetBatchIndex() int32 { - if x != nil && x.BatchIndex != nil { - return *x.BatchIndex - } - return 0 -} - -var File_message_id_data_proto protoreflect.FileDescriptor - -var file_message_id_data_proto_rawDesc = []byte{ - 0x0a, 0x15, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x5f, 0x64, 0x61, 0x74, - 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, - 0x77, 0x61, 0x6c, 0x2e, 0x70, 0x75, 0x6c, 0x73, 0x61, 0x72, 0x22, 0x84, 0x01, 0x0a, 0x0d, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, - 0x6c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x02, 0x28, 0x04, 0x52, 0x08, - 0x6c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, - 0x79, 0x49, 0x64, 0x18, 0x02, 0x20, 0x02, 0x28, 0x04, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x79, - 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x64, 0x65, - 0x78, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, - 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x77, - 0x61, 0x6c, 0x69, 0x6d, 0x70, 0x6c, 0x73, 0x2f, 0x69, 0x6d, 0x70, 0x6c, 0x73, 0x2f, 0x70, 0x75, - 0x6c, 0x73, 0x61, 0x72, -} - -var ( - file_message_id_data_proto_rawDescOnce sync.Once - file_message_id_data_proto_rawDescData = file_message_id_data_proto_rawDesc -) - -func file_message_id_data_proto_rawDescGZIP() []byte { - file_message_id_data_proto_rawDescOnce.Do(func() { - file_message_id_data_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_id_data_proto_rawDescData) - }) - return file_message_id_data_proto_rawDescData -} - -var file_message_id_data_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_message_id_data_proto_goTypes = []interface{}{ - (*MessageIdData)(nil), // 0: milvus.proto.streaming.wal.pulsar.MessageIdData -} -var file_message_id_data_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_message_id_data_proto_init() } -func file_message_id_data_proto_init() { - if File_message_id_data_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_message_id_data_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MessageIdData); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_message_id_data_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_message_id_data_proto_goTypes, - DependencyIndexes: file_message_id_data_proto_depIdxs, - MessageInfos: file_message_id_data_proto_msgTypes, - }.Build() - File_message_id_data_proto = out.File - file_message_id_data_proto_rawDesc = nil - file_message_id_data_proto_goTypes = nil - file_message_id_data_proto_depIdxs = nil -} diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id_data.proto b/pkg/streaming/walimpls/impls/pulsar/message_id_data.proto deleted file mode 100644 index 44c45c5410..0000000000 --- a/pkg/streaming/walimpls/impls/pulsar/message_id_data.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto2"; - -package milvus.proto.streaming.wal.pulsar; - -option go_package = "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"; - - -message MessageIdData { - required uint64 ledgerId = 1; - required uint64 entryId = 2; - optional int32 partition = 3; - optional int32 batch_index = 4; -} \ No newline at end of file diff --git a/pkg/util/funcutil/policy.go b/pkg/util/funcutil/policy.go index fc0482b35c..384d3a436f 100644 --- a/pkg/util/funcutil/policy.go +++ b/pkg/util/funcutil/policy.go @@ -132,3 +132,7 @@ func SplitObjectName(objectName string) (string, string) { names := strings.Split(objectName, ".") return names[0], names[1] } + +func PolicyCheckerWithRole(policy, roleName string) bool { + return strings.Contains(policy, fmt.Sprintf(`"V0":"%s"`, roleName)) +} diff --git a/pkg/util/funcutil/policy_test.go b/pkg/util/funcutil/policy_test.go index 8659b82205..006e465442 100644 --- a/pkg/util/funcutil/policy_test.go +++ b/pkg/util/funcutil/policy_test.go @@ -72,3 +72,10 @@ func Test_PolicyForResource(t *testing.T) { `COLLECTION-db.col1`, PolicyForResource("db", "COLLECTION", "col1")) } + +func Test_PolicyCheckerWithRole(t *testing.T) { + a := PolicyForPrivilege("admin", "COLLECTION", "col1", "ALL", "default") + b := PolicyForPrivilege("foo", "COLLECTION", "col1", "ALL", "default") + assert.True(t, PolicyCheckerWithRole(a, "admin")) + assert.False(t, PolicyCheckerWithRole(b, "admin")) +}