2023-06-25 17:20:43 +08:00
|
|
|
package proxy
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2024-03-28 07:13:10 +08:00
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"google.golang.org/grpc/metadata"
|
2023-06-25 17:20:43 +08:00
|
|
|
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
|
|
"github.com/milvus-io/milvus/internal/types"
|
2023-10-20 14:26:09 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
2024-03-28 07:13:10 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util"
|
2023-06-25 17:20:43 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
2024-03-28 07:13:10 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/crypto"
|
2023-06-25 17:20:43 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
|
|
)
|
|
|
|
|
|
|
|
type createDatabaseTask struct {
|
2024-02-21 09:52:59 +08:00
|
|
|
baseTask
|
2023-06-25 17:20:43 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.CreateDatabaseRequest
|
|
|
|
ctx context.Context
|
2023-09-26 09:57:25 +08:00
|
|
|
rootCoord types.RootCoordClient
|
2023-06-25 17:20:43 +08:00
|
|
|
result *commonpb.Status
|
2023-10-20 14:26:09 +08:00
|
|
|
|
|
|
|
replicateMsgStream msgstream.MsgStream
|
2023-06-25 17:20:43 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) TraceCtx() context.Context {
|
|
|
|
return cdt.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) ID() UniqueID {
|
|
|
|
return cdt.Base.MsgID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) SetID(uid UniqueID) {
|
|
|
|
cdt.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) Name() string {
|
|
|
|
return CreateDatabaseTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) Type() commonpb.MsgType {
|
|
|
|
return cdt.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) BeginTs() Timestamp {
|
|
|
|
return cdt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) EndTs() Timestamp {
|
|
|
|
return cdt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) SetTs(ts Timestamp) {
|
|
|
|
cdt.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) OnEnqueue() error {
|
2023-10-20 14:26:09 +08:00
|
|
|
if cdt.Base == nil {
|
|
|
|
cdt.Base = commonpbutil.NewMsgBase()
|
|
|
|
}
|
2023-06-25 17:20:43 +08:00
|
|
|
cdt.Base.MsgType = commonpb.MsgType_CreateDatabase
|
|
|
|
cdt.Base.SourceID = paramtable.GetNodeID()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) PreExecute(ctx context.Context) error {
|
|
|
|
return ValidateDatabaseName(cdt.GetDbName())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
cdt.result, err = cdt.rootCoord.CreateDatabase(ctx, cdt.CreateDatabaseRequest)
|
2023-10-20 14:26:09 +08:00
|
|
|
if cdt.result != nil && cdt.result.ErrorCode == commonpb.ErrorCode_Success {
|
|
|
|
SendReplicateMessagePack(ctx, cdt.replicateMsgStream, cdt.CreateDatabaseRequest)
|
|
|
|
}
|
2023-06-25 17:20:43 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cdt *createDatabaseTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type dropDatabaseTask struct {
|
2024-02-21 09:52:59 +08:00
|
|
|
baseTask
|
2023-06-25 17:20:43 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.DropDatabaseRequest
|
|
|
|
ctx context.Context
|
2023-09-26 09:57:25 +08:00
|
|
|
rootCoord types.RootCoordClient
|
2023-06-25 17:20:43 +08:00
|
|
|
result *commonpb.Status
|
2023-10-20 14:26:09 +08:00
|
|
|
|
|
|
|
replicateMsgStream msgstream.MsgStream
|
2023-06-25 17:20:43 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) TraceCtx() context.Context {
|
|
|
|
return ddt.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) ID() UniqueID {
|
|
|
|
return ddt.Base.MsgID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) SetID(uid UniqueID) {
|
|
|
|
ddt.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) Name() string {
|
|
|
|
return DropCollectionTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) Type() commonpb.MsgType {
|
|
|
|
return ddt.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) BeginTs() Timestamp {
|
|
|
|
return ddt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) EndTs() Timestamp {
|
|
|
|
return ddt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) SetTs(ts Timestamp) {
|
|
|
|
ddt.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) OnEnqueue() error {
|
2023-10-20 14:26:09 +08:00
|
|
|
if ddt.Base == nil {
|
|
|
|
ddt.Base = commonpbutil.NewMsgBase()
|
|
|
|
}
|
2023-06-25 17:20:43 +08:00
|
|
|
ddt.Base.MsgType = commonpb.MsgType_DropDatabase
|
|
|
|
ddt.Base.SourceID = paramtable.GetNodeID()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) PreExecute(ctx context.Context) error {
|
|
|
|
return ValidateDatabaseName(ddt.GetDbName())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
ddt.result, err = ddt.rootCoord.DropDatabase(ctx, ddt.DropDatabaseRequest)
|
|
|
|
|
|
|
|
if ddt.result != nil && ddt.result.ErrorCode == commonpb.ErrorCode_Success {
|
|
|
|
globalMetaCache.RemoveDatabase(ctx, ddt.DbName)
|
2023-10-20 14:26:09 +08:00
|
|
|
SendReplicateMessagePack(ctx, ddt.replicateMsgStream, ddt.DropDatabaseRequest)
|
2023-06-25 17:20:43 +08:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ddt *dropDatabaseTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type listDatabaseTask struct {
|
2024-02-21 09:52:59 +08:00
|
|
|
baseTask
|
2023-06-25 17:20:43 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.ListDatabasesRequest
|
|
|
|
ctx context.Context
|
2023-09-26 09:57:25 +08:00
|
|
|
rootCoord types.RootCoordClient
|
2023-06-25 17:20:43 +08:00
|
|
|
result *milvuspb.ListDatabasesResponse
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) TraceCtx() context.Context {
|
|
|
|
return ldt.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) ID() UniqueID {
|
|
|
|
return ldt.Base.MsgID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) SetID(uid UniqueID) {
|
|
|
|
ldt.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) Name() string {
|
|
|
|
return ListDatabaseTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) Type() commonpb.MsgType {
|
|
|
|
return ldt.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) BeginTs() Timestamp {
|
|
|
|
return ldt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) EndTs() Timestamp {
|
|
|
|
return ldt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) SetTs(ts Timestamp) {
|
|
|
|
ldt.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) OnEnqueue() error {
|
|
|
|
ldt.Base = commonpbutil.NewMsgBase()
|
|
|
|
ldt.Base.MsgType = commonpb.MsgType_ListDatabases
|
|
|
|
ldt.Base.SourceID = paramtable.GetNodeID()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) PreExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
2024-03-28 07:13:10 +08:00
|
|
|
curUser, _ := GetCurUserFromContext(ldt.ctx)
|
|
|
|
if curUser != "" {
|
|
|
|
originValue := fmt.Sprintf("%s%s%s", curUser, util.CredentialSeperator, curUser)
|
|
|
|
authKey := strings.ToLower(util.HeaderAuthorize)
|
|
|
|
authValue := crypto.Base64Encode(originValue)
|
|
|
|
ldt.ctx = metadata.AppendToOutgoingContext(ldt.ctx, authKey, authValue)
|
|
|
|
}
|
|
|
|
ldt.result, err = ldt.rootCoord.ListDatabases(ldt.ctx, ldt.ListDatabasesRequest)
|
2023-06-25 17:20:43 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ldt *listDatabaseTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|