2021-11-10 23:55:48 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 10:09:43 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-11-10 23:55:48 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 10:09:43 +08:00
|
|
|
//
|
2021-11-10 23:55:48 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 10:09:43 +08:00
|
|
|
|
2021-06-22 14:40:07 +08:00
|
|
|
package proxy
|
2020-11-03 14:53:36 +08:00
|
|
|
|
|
|
|
import (
|
2021-02-04 19:34:35 +08:00
|
|
|
"context"
|
2021-03-08 19:39:36 +08:00
|
|
|
"errors"
|
2021-02-08 14:20:29 +08:00
|
|
|
"fmt"
|
2020-11-26 16:01:31 +08:00
|
|
|
"math"
|
2022-04-29 13:35:49 +08:00
|
|
|
|
2021-03-08 19:39:36 +08:00
|
|
|
"go.uber.org/zap"
|
2021-03-05 10:15:27 +08:00
|
|
|
|
2022-10-18 19:17:27 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2022-10-17 18:01:25 +08:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
2021-10-20 17:56:38 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2022-03-03 21:57:56 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2022-10-17 18:01:25 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2022-10-17 18:01:25 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/types"
|
2022-10-18 19:17:27 +08:00
|
|
|
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2020-11-03 14:53:36 +08:00
|
|
|
)
|
|
|
|
|
2021-02-23 09:58:06 +08:00
|
|
|
const (
|
2022-08-30 10:32:56 +08:00
|
|
|
AnnsFieldKey = "anns_field"
|
|
|
|
TopKKey = "topk"
|
2022-10-12 18:37:23 +08:00
|
|
|
NQKey = "nq"
|
2022-08-30 10:32:56 +08:00
|
|
|
MetricTypeKey = "metric_type"
|
|
|
|
SearchParamsKey = "params"
|
|
|
|
RoundDecimalKey = "round_decimal"
|
|
|
|
OffsetKey = "offset"
|
2022-09-01 18:54:58 +08:00
|
|
|
LimitKey = "limit"
|
2022-08-30 10:32:56 +08:00
|
|
|
|
2022-10-08 15:38:58 +08:00
|
|
|
InsertTaskName = "InsertTask"
|
|
|
|
CreateCollectionTaskName = "CreateCollectionTask"
|
|
|
|
DropCollectionTaskName = "DropCollectionTask"
|
|
|
|
HasCollectionTaskName = "HasCollectionTask"
|
|
|
|
DescribeCollectionTaskName = "DescribeCollectionTask"
|
|
|
|
ShowCollectionTaskName = "ShowCollectionTask"
|
|
|
|
CreatePartitionTaskName = "CreatePartitionTask"
|
|
|
|
DropPartitionTaskName = "DropPartitionTask"
|
|
|
|
HasPartitionTaskName = "HasPartitionTask"
|
|
|
|
ShowPartitionTaskName = "ShowPartitionTask"
|
|
|
|
FlushTaskName = "FlushTask"
|
|
|
|
LoadCollectionTaskName = "LoadCollectionTask"
|
|
|
|
ReleaseCollectionTaskName = "ReleaseCollectionTask"
|
|
|
|
LoadPartitionTaskName = "LoadPartitionsTask"
|
|
|
|
ReleasePartitionTaskName = "ReleasePartitionsTask"
|
|
|
|
deleteTaskName = "DeleteTask"
|
|
|
|
CreateAliasTaskName = "CreateAliasTask"
|
|
|
|
DropAliasTaskName = "DropAliasTask"
|
|
|
|
AlterAliasTaskName = "AlterAliasTask"
|
2022-10-10 20:31:22 +08:00
|
|
|
AlterCollectionTaskName = "AlterCollectionTask"
|
2021-09-22 10:15:54 +08:00
|
|
|
|
2022-01-04 14:13:44 +08:00
|
|
|
// minFloat32 minimum float.
|
2021-09-22 10:15:54 +08:00
|
|
|
minFloat32 = -1 * float32(math.MaxFloat32)
|
2021-02-23 09:58:06 +08:00
|
|
|
)
|
|
|
|
|
2020-11-03 14:53:36 +08:00
|
|
|
type task interface {
|
2021-03-25 14:41:46 +08:00
|
|
|
TraceCtx() context.Context
|
2020-11-23 16:52:17 +08:00
|
|
|
ID() UniqueID // return ReqID
|
|
|
|
SetID(uid UniqueID) // set ReqID
|
2021-02-23 09:58:06 +08:00
|
|
|
Name() string
|
2021-01-16 15:06:19 +08:00
|
|
|
Type() commonpb.MsgType
|
2020-11-05 18:01:33 +08:00
|
|
|
BeginTs() Timestamp
|
|
|
|
EndTs() Timestamp
|
2020-11-04 17:58:43 +08:00
|
|
|
SetTs(ts Timestamp)
|
2021-01-22 09:36:18 +08:00
|
|
|
OnEnqueue() error
|
2021-02-23 09:58:06 +08:00
|
|
|
PreExecute(ctx context.Context) error
|
|
|
|
Execute(ctx context.Context) error
|
|
|
|
PostExecute(ctx context.Context) error
|
2020-11-03 14:53:36 +08:00
|
|
|
WaitToFinish() error
|
2020-11-05 18:01:33 +08:00
|
|
|
Notify(err error)
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2021-05-31 11:40:31 +08:00
|
|
|
type dmlTask interface {
|
|
|
|
task
|
2022-06-02 15:34:04 +08:00
|
|
|
getChannels() ([]pChan, error)
|
2021-06-15 10:19:38 +08:00
|
|
|
getPChanStats() (map[pChan]pChanStatistics, error)
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
|
|
|
|
2020-11-07 16:18:23 +08:00
|
|
|
type BaseInsertTask = msgstream.InsertMsg
|
2020-11-05 18:01:33 +08:00
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type createCollectionTask struct {
|
2020-11-17 20:00:23 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.CreateCollectionRequest
|
2021-09-11 11:36:22 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
schema *schemapb.CollectionSchema
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return cct.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cct.Base.MsgID
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
cct.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return CreateCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cct.Base.MsgType
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cct.Base.Timestamp
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cct.Base.Timestamp
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
cct.Base.Timestamp = ts
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
cct.Base = &commonpb.MsgBase{}
|
2021-09-11 18:06:02 +08:00
|
|
|
cct.Base.MsgType = commonpb.MsgType_CreateCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
cct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-23 09:58:06 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
cct.Base.MsgType = commonpb.MsgType_CreateCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
cct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
|
|
|
cct.schema = &schemapb.CollectionSchema{}
|
|
|
|
err := proto.Unmarshal(cct.Schema, cct.schema)
|
2021-09-11 18:06:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-06-21 11:42:18 +08:00
|
|
|
cct.schema.AutoID = false
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-12-23 18:39:11 +08:00
|
|
|
if cct.ShardsNum > Params.ProxyCfg.MaxShardNum {
|
|
|
|
return fmt.Errorf("maximum shards's number should be limited to %d", Params.ProxyCfg.MaxShardNum)
|
2021-09-08 15:00:00 +08:00
|
|
|
}
|
|
|
|
|
2021-12-23 18:39:11 +08:00
|
|
|
if int64(len(cct.schema.Fields)) > Params.ProxyCfg.MaxFieldNum {
|
|
|
|
return fmt.Errorf("maximum field's number should be limited to %d", Params.ProxyCfg.MaxFieldNum)
|
2020-11-26 16:01:31 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// validate collection name
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(cct.schema.Name); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate whether field names duplicates
|
2021-10-26 10:38:41 +08:00
|
|
|
if err := validateDuplicatedFieldName(cct.schema.Fields); err != nil {
|
2020-11-30 19:38:23 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate primary key definition
|
2021-10-25 23:42:29 +08:00
|
|
|
if err := validatePrimaryKey(cct.schema); err != nil {
|
2020-11-30 19:38:23 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate auto id definition
|
2021-06-21 11:42:18 +08:00
|
|
|
if err := ValidateFieldAutoID(cct.schema); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate field type definition
|
|
|
|
if err := validateFieldType(cct.schema); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-11-26 16:01:31 +08:00
|
|
|
for _, field := range cct.schema.Fields {
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate field name
|
2021-10-23 18:25:37 +08:00
|
|
|
if err := validateFieldName(field.Name); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
2022-03-03 16:57:56 +08:00
|
|
|
// validate vector field type parameters
|
2021-03-12 14:22:09 +08:00
|
|
|
if field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector {
|
2022-04-29 13:35:49 +08:00
|
|
|
err = validateDimension(field)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2020-11-26 16:01:31 +08:00
|
|
|
}
|
2022-04-29 13:35:49 +08:00
|
|
|
}
|
|
|
|
// valid max length per row parameters
|
2022-06-07 15:58:06 +08:00
|
|
|
// if max_length not specified, return error
|
2022-04-29 13:35:49 +08:00
|
|
|
if field.DataType == schemapb.DataType_VarChar {
|
|
|
|
err = validateMaxLengthPerRow(cct.schema.Name, field)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2020-11-26 16:01:31 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-06 10:03:34 +08:00
|
|
|
if err := validateMultipleVectorFields(cct.schema); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-04-29 13:35:49 +08:00
|
|
|
cct.CreateCollectionRequest.Schema, err = proto.Marshal(cct.schema)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-11-05 18:01:33 +08:00
|
|
|
return nil
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) Execute(ctx context.Context) error {
|
2021-01-22 09:36:18 +08:00
|
|
|
var err error
|
2021-06-21 17:28:03 +08:00
|
|
|
cct.result, err = cct.rootCoord.CreateCollection(ctx, cct.CreateCollectionRequest)
|
2021-05-27 17:09:50 +08:00
|
|
|
return err
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cct *createCollectionTask) PostExecute(ctx context.Context) error {
|
2020-11-05 18:01:33 +08:00
|
|
|
return nil
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type dropCollectionTask struct {
|
2020-11-17 20:00:23 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.DropCollectionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
chMgr channelsMgr
|
|
|
|
chTicker channelsTimeTicker
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return dct.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.MsgID
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dct.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return DropCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.MsgType
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dct.Base.Timestamp = ts
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
dct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
dct.Base.MsgType = commonpb.MsgType_DropCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
dct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(dct.CollectionName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
2020-11-09 17:25:53 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) Execute(ctx context.Context) error {
|
2022-09-27 19:18:54 +08:00
|
|
|
var err error
|
|
|
|
dct.result, err = dct.rootCoord.DropCollection(ctx, dct.DropCollectionRequest)
|
|
|
|
if common.IsCollectionNotExistError(err) {
|
2022-09-05 13:29:11 +08:00
|
|
|
// make dropping collection idempotent.
|
|
|
|
dct.result = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
|
|
|
|
return nil
|
2021-02-01 10:53:13 +08:00
|
|
|
}
|
2022-09-27 19:18:54 +08:00
|
|
|
return err
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *dropCollectionTask) PostExecute(ctx context.Context) error {
|
2020-11-30 22:14:19 +08:00
|
|
|
return nil
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type hasCollectionTask struct {
|
2020-11-17 20:00:23 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.HasCollectionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *milvuspb.BoolResponse
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return hct.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hct.Base.MsgID
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
hct.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return HasCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hct.Base.MsgType
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
hct.Base.Timestamp = ts
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
hct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
hct.Base.MsgType = commonpb.MsgType_HasCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
hct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(hct.CollectionName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
2020-11-09 17:25:53 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) Execute(ctx context.Context) error {
|
2021-01-22 09:36:18 +08:00
|
|
|
var err error
|
2021-06-21 17:28:03 +08:00
|
|
|
hct.result, err = hct.rootCoord.HasCollection(ctx, hct.HasCollectionRequest)
|
2021-12-23 21:46:10 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-02-04 19:34:35 +08:00
|
|
|
if hct.result == nil {
|
|
|
|
return errors.New("has collection resp is nil")
|
|
|
|
}
|
2021-03-10 22:06:22 +08:00
|
|
|
if hct.result.Status.ErrorCode != commonpb.ErrorCode_Success {
|
2021-02-04 19:34:35 +08:00
|
|
|
return errors.New(hct.result.Status.Reason)
|
|
|
|
}
|
2021-12-23 21:46:10 +08:00
|
|
|
return nil
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hct *hasCollectionTask) PostExecute(ctx context.Context) error {
|
2020-11-09 17:25:53 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type describeCollectionTask struct {
|
2020-11-17 20:00:23 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.DescribeCollectionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *milvuspb.DescribeCollectionResponse
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return dct.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.MsgID
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dct.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return DescribeCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.MsgType
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dct.Base.Timestamp = ts
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
dct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
dct.Base.MsgType = commonpb.MsgType_DescribeCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
dct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-10-09 16:10:56 +08:00
|
|
|
if dct.CollectionID != 0 && len(dct.CollectionName) == 0 {
|
|
|
|
return nil
|
2020-11-26 16:01:31 +08:00
|
|
|
}
|
2021-10-09 16:10:56 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
return validateCollectionName(dct.CollectionName)
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) Execute(ctx context.Context) error {
|
2021-01-22 09:36:18 +08:00
|
|
|
var err error
|
2021-06-08 19:25:37 +08:00
|
|
|
dct.result = &milvuspb.DescribeCollectionResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
Schema: &schemapb.CollectionSchema{
|
|
|
|
Name: "",
|
|
|
|
Description: "",
|
|
|
|
AutoID: false,
|
|
|
|
Fields: make([]*schemapb.FieldSchema, 0),
|
|
|
|
},
|
|
|
|
CollectionID: 0,
|
|
|
|
VirtualChannelNames: nil,
|
|
|
|
PhysicalChannelNames: nil,
|
2022-09-02 19:20:59 +08:00
|
|
|
CollectionName: dct.GetCollectionName(),
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
|
|
|
|
2021-06-21 17:28:03 +08:00
|
|
|
result, err := dct.rootCoord.DescribeCollection(ctx, dct.DescribeCollectionRequest)
|
2021-06-08 19:25:37 +08:00
|
|
|
|
2021-06-11 15:33:18 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-06-08 19:25:37 +08:00
|
|
|
|
2021-06-11 15:33:18 +08:00
|
|
|
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
dct.result.Status = result.Status
|
|
|
|
} else {
|
|
|
|
dct.result.Schema.Name = result.Schema.Name
|
|
|
|
dct.result.Schema.Description = result.Schema.Description
|
|
|
|
dct.result.Schema.AutoID = result.Schema.AutoID
|
|
|
|
dct.result.CollectionID = result.CollectionID
|
|
|
|
dct.result.VirtualChannelNames = result.VirtualChannelNames
|
|
|
|
dct.result.PhysicalChannelNames = result.PhysicalChannelNames
|
2021-07-21 18:00:14 +08:00
|
|
|
dct.result.CreatedTimestamp = result.CreatedTimestamp
|
|
|
|
dct.result.CreatedUtcTimestamp = result.CreatedUtcTimestamp
|
2021-09-14 11:59:47 +08:00
|
|
|
dct.result.ShardsNum = result.ShardsNum
|
2021-12-21 19:49:02 +08:00
|
|
|
dct.result.ConsistencyLevel = result.ConsistencyLevel
|
2022-06-13 19:22:10 +08:00
|
|
|
dct.result.Aliases = result.Aliases
|
2022-10-10 20:31:22 +08:00
|
|
|
dct.result.Properties = result.Properties
|
2021-06-11 15:33:18 +08:00
|
|
|
for _, field := range result.Schema.Fields {
|
2021-09-13 17:12:19 +08:00
|
|
|
if field.FieldID >= common.StartOfUserFieldID {
|
2021-06-11 15:33:18 +08:00
|
|
|
dct.result.Schema.Fields = append(dct.result.Schema.Fields, &schemapb.FieldSchema{
|
|
|
|
FieldID: field.FieldID,
|
|
|
|
Name: field.Name,
|
|
|
|
IsPrimaryKey: field.IsPrimaryKey,
|
2021-06-21 11:42:18 +08:00
|
|
|
AutoID: field.AutoID,
|
2021-06-11 15:33:18 +08:00
|
|
|
Description: field.Description,
|
|
|
|
DataType: field.DataType,
|
|
|
|
TypeParams: field.TypeParams,
|
|
|
|
IndexParams: field.IndexParams,
|
|
|
|
})
|
|
|
|
}
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-11 15:33:18 +08:00
|
|
|
return nil
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dct *describeCollectionTask) PostExecute(ctx context.Context) error {
|
2021-02-02 19:54:31 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type showCollectionsTask struct {
|
2020-11-17 20:00:23 +08:00
|
|
|
Condition
|
2021-03-12 14:22:09 +08:00
|
|
|
*milvuspb.ShowCollectionsRequest
|
2021-06-22 16:44:09 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
queryCoord types.QueryCoord
|
|
|
|
result *milvuspb.ShowCollectionsResponse
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return sct.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return sct.Base.MsgID
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
sct.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return ShowCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return sct.Base.MsgType
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return sct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return sct.Base.Timestamp
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
sct.Base.Timestamp = ts
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
sct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
sct.Base.MsgType = commonpb.MsgType_ShowCollections
|
2022-04-24 22:03:44 +08:00
|
|
|
sct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-08-02 22:39:25 +08:00
|
|
|
if sct.GetType() == milvuspb.ShowType_InMemory {
|
|
|
|
for _, collectionName := range sct.CollectionNames {
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collectionName); err != nil {
|
2021-08-02 22:39:25 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2020-11-09 17:25:53 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) Execute(ctx context.Context) error {
|
2021-06-21 17:28:03 +08:00
|
|
|
respFromRootCoord, err := sct.rootCoord.ShowCollections(ctx, sct.ShowCollectionsRequest)
|
2021-06-03 19:09:33 +08:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-06-03 19:09:33 +08:00
|
|
|
|
2021-06-21 17:28:03 +08:00
|
|
|
if respFromRootCoord == nil {
|
2021-06-03 19:09:33 +08:00
|
|
|
return errors.New("failed to show collections")
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-06-03 19:09:33 +08:00
|
|
|
|
2021-06-21 17:28:03 +08:00
|
|
|
if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(respFromRootCoord.Status.Reason)
|
2021-06-03 19:09:33 +08:00
|
|
|
}
|
|
|
|
|
2021-08-02 22:39:25 +08:00
|
|
|
if sct.GetType() == milvuspb.ShowType_InMemory {
|
|
|
|
IDs2Names := make(map[UniqueID]string)
|
|
|
|
for offset, collectionName := range respFromRootCoord.CollectionNames {
|
|
|
|
collectionID := respFromRootCoord.CollectionIds[offset]
|
|
|
|
IDs2Names[collectionID] = collectionName
|
|
|
|
}
|
|
|
|
collectionIDs := make([]UniqueID, 0)
|
|
|
|
for _, collectionName := range sct.CollectionNames {
|
|
|
|
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName),
|
|
|
|
zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections"))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
collectionIDs = append(collectionIDs, collectionID)
|
|
|
|
IDs2Names[collectionID] = collectionName
|
|
|
|
}
|
|
|
|
|
2021-06-22 16:44:09 +08:00
|
|
|
resp, err := sct.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
|
2021-06-03 19:09:33 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_ShowCollections,
|
2021-08-02 22:39:25 +08:00
|
|
|
MsgID: sct.Base.MsgID,
|
|
|
|
Timestamp: sct.Base.Timestamp,
|
|
|
|
SourceID: sct.Base.SourceID,
|
2021-06-03 19:09:33 +08:00
|
|
|
},
|
|
|
|
//DbID: sct.ShowCollectionsRequest.DbName,
|
2021-08-02 22:39:25 +08:00
|
|
|
CollectionIDs: collectionIDs,
|
2021-06-03 19:09:33 +08:00
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp == nil {
|
|
|
|
return errors.New("failed to show collections")
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
2022-05-11 09:47:53 +08:00
|
|
|
// update collectionID to collection name, and return new error info to sdk
|
|
|
|
newErrorReason := resp.Status.Reason
|
|
|
|
for _, collectionID := range collectionIDs {
|
|
|
|
newErrorReason = ReplaceID2Name(newErrorReason, collectionID, IDs2Names[collectionID])
|
|
|
|
}
|
|
|
|
return errors.New(newErrorReason)
|
2021-06-03 19:09:33 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
sct.result = &milvuspb.ShowCollectionsResponse{
|
2022-06-09 18:20:07 +08:00
|
|
|
Status: resp.Status,
|
|
|
|
CollectionNames: make([]string, 0, len(resp.CollectionIDs)),
|
|
|
|
CollectionIds: make([]int64, 0, len(resp.CollectionIDs)),
|
|
|
|
CreatedTimestamps: make([]uint64, 0, len(resp.CollectionIDs)),
|
|
|
|
CreatedUtcTimestamps: make([]uint64, 0, len(resp.CollectionIDs)),
|
|
|
|
InMemoryPercentages: make([]int64, 0, len(resp.CollectionIDs)),
|
|
|
|
QueryServiceAvailable: make([]bool, 0, len(resp.CollectionIDs)),
|
2021-06-03 19:09:33 +08:00
|
|
|
}
|
|
|
|
|
2021-08-02 22:39:25 +08:00
|
|
|
for offset, id := range resp.CollectionIDs {
|
|
|
|
collectionName, ok := IDs2Names[id]
|
|
|
|
if !ok {
|
|
|
|
log.Debug("Failed to get collection info.", zap.Any("collectionName", collectionName),
|
|
|
|
zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections"))
|
|
|
|
return errors.New("failed to show collections")
|
|
|
|
}
|
|
|
|
collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, collectionName)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("Failed to get collection info.", zap.Any("collectionName", collectionName),
|
|
|
|
zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections"))
|
|
|
|
return err
|
|
|
|
}
|
2021-06-03 19:09:33 +08:00
|
|
|
sct.result.CollectionIds = append(sct.result.CollectionIds, id)
|
2021-08-02 22:39:25 +08:00
|
|
|
sct.result.CollectionNames = append(sct.result.CollectionNames, collectionName)
|
|
|
|
sct.result.CreatedTimestamps = append(sct.result.CreatedTimestamps, collectionInfo.createdTimestamp)
|
|
|
|
sct.result.CreatedUtcTimestamps = append(sct.result.CreatedUtcTimestamps, collectionInfo.createdUtcTimestamp)
|
|
|
|
sct.result.InMemoryPercentages = append(sct.result.InMemoryPercentages, resp.InMemoryPercentages[offset])
|
2022-06-09 18:20:07 +08:00
|
|
|
sct.result.QueryServiceAvailable = append(sct.result.QueryServiceAvailable, resp.QueryServiceAvailable[offset])
|
2021-06-03 19:09:33 +08:00
|
|
|
}
|
2021-06-27 12:10:08 +08:00
|
|
|
} else {
|
|
|
|
sct.result = respFromRootCoord
|
2021-06-03 19:09:33 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2020-11-09 17:25:53 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (sct *showCollectionsTask) PostExecute(ctx context.Context) error {
|
2020-11-09 17:25:53 +08:00
|
|
|
return nil
|
|
|
|
}
|
2020-11-19 17:09:22 +08:00
|
|
|
|
2022-10-10 20:31:22 +08:00
|
|
|
type alterCollectionTask struct {
|
|
|
|
Condition
|
|
|
|
*milvuspb.AlterCollectionRequest
|
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) TraceCtx() context.Context {
|
|
|
|
return act.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) ID() UniqueID {
|
|
|
|
return act.Base.MsgID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) SetID(uid UniqueID) {
|
|
|
|
act.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) Name() string {
|
|
|
|
return AlterCollectionTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) Type() commonpb.MsgType {
|
|
|
|
return act.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) BeginTs() Timestamp {
|
|
|
|
return act.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) EndTs() Timestamp {
|
|
|
|
return act.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) SetTs(ts Timestamp) {
|
|
|
|
act.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) OnEnqueue() error {
|
|
|
|
act.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) PreExecute(ctx context.Context) error {
|
|
|
|
act.Base.MsgType = commonpb.MsgType_AlterCollection
|
|
|
|
act.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
act.result, err = act.rootCoord.AlterCollection(ctx, act.AlterCollectionRequest)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (act *alterCollectionTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type createPartitionTask struct {
|
2020-11-19 17:09:22 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.CreatePartitionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return cpt.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cpt.Base.MsgID
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
cpt.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return CreatePartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cpt.Base.MsgType
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return cpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
cpt.Base.Timestamp = ts
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
cpt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
cpt.Base.MsgType = commonpb.MsgType_CreatePartition
|
2022-04-24 22:03:44 +08:00
|
|
|
cpt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-01-18 19:32:08 +08:00
|
|
|
collName, partitionTag := cpt.CollectionName, cpt.PartitionName
|
2020-11-26 16:01:31 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-10-23 18:23:44 +08:00
|
|
|
if err := validatePartitionTag(partitionTag, true); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) Execute(ctx context.Context) (err error) {
|
2021-06-21 17:28:03 +08:00
|
|
|
cpt.result, err = cpt.rootCoord.CreatePartition(ctx, cpt.CreatePartitionRequest)
|
2022-09-16 11:12:47 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-03-10 22:06:22 +08:00
|
|
|
if cpt.result.ErrorCode != commonpb.ErrorCode_Success {
|
2021-02-04 19:34:35 +08:00
|
|
|
return errors.New(cpt.result.Reason)
|
|
|
|
}
|
2020-11-19 17:09:22 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (cpt *createPartitionTask) PostExecute(ctx context.Context) error {
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type dropPartitionTask struct {
|
2020-11-19 17:09:22 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.DropPartitionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return dpt.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dpt.Base.MsgID
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dpt.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return DropPartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dpt.Base.MsgType
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return dpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
dpt.Base.Timestamp = ts
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
dpt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
dpt.Base.MsgType = commonpb.MsgType_DropPartition
|
2022-04-24 22:03:44 +08:00
|
|
|
dpt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-01-18 19:32:08 +08:00
|
|
|
collName, partitionTag := dpt.CollectionName, dpt.PartitionName
|
2020-11-26 16:01:31 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-10-23 18:23:44 +08:00
|
|
|
if err := validatePartitionTag(partitionTag, true); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) Execute(ctx context.Context) (err error) {
|
2021-06-21 17:28:03 +08:00
|
|
|
dpt.result, err = dpt.rootCoord.DropPartition(ctx, dpt.DropPartitionRequest)
|
2022-09-16 11:12:47 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-03-10 22:06:22 +08:00
|
|
|
if dpt.result.ErrorCode != commonpb.ErrorCode_Success {
|
2021-02-04 19:34:35 +08:00
|
|
|
return errors.New(dpt.result.Reason)
|
|
|
|
}
|
2020-11-19 17:09:22 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (dpt *dropPartitionTask) PostExecute(ctx context.Context) error {
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type hasPartitionTask struct {
|
2020-11-19 17:09:22 +08:00
|
|
|
Condition
|
2021-01-22 09:36:18 +08:00
|
|
|
*milvuspb.HasPartitionRequest
|
2021-06-21 17:28:03 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *milvuspb.BoolResponse
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return hpt.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hpt.Base.MsgID
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
hpt.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return HasPartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hpt.Base.MsgType
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return hpt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
hpt.Base.Timestamp = ts
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
hpt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
hpt.Base.MsgType = commonpb.MsgType_HasPartition
|
2022-04-24 22:03:44 +08:00
|
|
|
hpt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-01-18 19:32:08 +08:00
|
|
|
collName, partitionTag := hpt.CollectionName, hpt.PartitionName
|
2020-11-26 16:01:31 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-10-23 18:23:44 +08:00
|
|
|
if err := validatePartitionTag(partitionTag, true); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) {
|
2021-06-21 17:28:03 +08:00
|
|
|
hpt.result, err = hpt.rootCoord.HasPartition(ctx, hpt.HasPartitionRequest)
|
2022-09-16 11:12:47 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-03-10 22:06:22 +08:00
|
|
|
if hpt.result.Status.ErrorCode != commonpb.ErrorCode_Success {
|
2021-02-04 19:34:35 +08:00
|
|
|
return errors.New(hpt.result.Status.Reason)
|
|
|
|
}
|
2020-11-19 17:09:22 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (hpt *hasPartitionTask) PostExecute(ctx context.Context) error {
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type showPartitionsTask struct {
|
2020-11-19 17:09:22 +08:00
|
|
|
Condition
|
2021-03-12 14:22:09 +08:00
|
|
|
*milvuspb.ShowPartitionsRequest
|
2021-08-02 22:39:25 +08:00
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
queryCoord types.QueryCoord
|
|
|
|
result *milvuspb.ShowPartitionsResponse
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return spt.ctx
|
2021-01-22 09:36:18 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) ID() UniqueID {
|
2021-01-18 19:32:08 +08:00
|
|
|
return spt.Base.MsgID
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) SetID(uid UniqueID) {
|
2021-01-18 19:32:08 +08:00
|
|
|
spt.Base.MsgID = uid
|
2020-11-23 16:52:17 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return ShowPartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) Type() commonpb.MsgType {
|
2021-01-18 19:32:08 +08:00
|
|
|
return spt.Base.MsgType
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) BeginTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return spt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) EndTs() Timestamp {
|
2021-01-18 19:32:08 +08:00
|
|
|
return spt.Base.Timestamp
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) SetTs(ts Timestamp) {
|
2021-01-18 19:32:08 +08:00
|
|
|
spt.Base.Timestamp = ts
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
spt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
spt.Base.MsgType = commonpb.MsgType_ShowPartitions
|
2022-04-24 22:03:44 +08:00
|
|
|
spt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-01-22 09:36:18 +08:00
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(spt.CollectionName); err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
return err
|
|
|
|
}
|
2021-08-02 22:39:25 +08:00
|
|
|
|
|
|
|
if spt.GetType() == milvuspb.ShowType_InMemory {
|
|
|
|
for _, partitionName := range spt.PartitionNames {
|
2021-10-23 18:23:44 +08:00
|
|
|
if err := validatePartitionTag(partitionName, true); err != nil {
|
2021-08-02 22:39:25 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) Execute(ctx context.Context) error {
|
2021-08-02 22:39:25 +08:00
|
|
|
respFromRootCoord, err := spt.rootCoord.ShowPartitions(ctx, spt.ShowPartitionsRequest)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-01-31 14:55:36 +08:00
|
|
|
}
|
2021-08-02 22:39:25 +08:00
|
|
|
|
|
|
|
if respFromRootCoord == nil {
|
|
|
|
return errors.New("failed to show partitions")
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-08-02 22:39:25 +08:00
|
|
|
|
|
|
|
if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(respFromRootCoord.Status.Reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
if spt.GetType() == milvuspb.ShowType_InMemory {
|
|
|
|
collectionName := spt.CollectionName
|
|
|
|
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName),
|
|
|
|
zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions"))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
IDs2Names := make(map[UniqueID]string)
|
|
|
|
for offset, partitionName := range respFromRootCoord.PartitionNames {
|
|
|
|
partitionID := respFromRootCoord.PartitionIDs[offset]
|
|
|
|
IDs2Names[partitionID] = partitionName
|
|
|
|
}
|
|
|
|
partitionIDs := make([]UniqueID, 0)
|
|
|
|
for _, partitionName := range spt.PartitionNames {
|
|
|
|
partitionID, err := globalMetaCache.GetPartitionID(ctx, collectionName, partitionName)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
|
|
|
|
zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions"))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
partitionIDs = append(partitionIDs, partitionID)
|
|
|
|
IDs2Names[partitionID] = partitionName
|
|
|
|
}
|
|
|
|
resp, err := spt.queryCoord.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_ShowCollections,
|
|
|
|
MsgID: spt.Base.MsgID,
|
|
|
|
Timestamp: spt.Base.Timestamp,
|
|
|
|
SourceID: spt.Base.SourceID,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionIDs: partitionIDs,
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp == nil {
|
|
|
|
return errors.New("failed to show partitions")
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(resp.Status.Reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
spt.result = &milvuspb.ShowPartitionsResponse{
|
|
|
|
Status: resp.Status,
|
|
|
|
PartitionNames: make([]string, 0, len(resp.PartitionIDs)),
|
|
|
|
PartitionIDs: make([]int64, 0, len(resp.PartitionIDs)),
|
|
|
|
CreatedTimestamps: make([]uint64, 0, len(resp.PartitionIDs)),
|
|
|
|
CreatedUtcTimestamps: make([]uint64, 0, len(resp.PartitionIDs)),
|
|
|
|
InMemoryPercentages: make([]int64, 0, len(resp.PartitionIDs)),
|
|
|
|
}
|
|
|
|
|
|
|
|
for offset, id := range resp.PartitionIDs {
|
|
|
|
partitionName, ok := IDs2Names[id]
|
|
|
|
if !ok {
|
|
|
|
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
|
|
|
|
zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions"))
|
|
|
|
return errors.New("failed to show partitions")
|
|
|
|
}
|
|
|
|
partitionInfo, err := globalMetaCache.GetPartitionInfo(ctx, collectionName, partitionName)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
|
|
|
|
zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions"))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
spt.result.PartitionIDs = append(spt.result.PartitionIDs, id)
|
|
|
|
spt.result.PartitionNames = append(spt.result.PartitionNames, partitionName)
|
|
|
|
spt.result.CreatedTimestamps = append(spt.result.CreatedTimestamps, partitionInfo.createdTimestamp)
|
|
|
|
spt.result.CreatedUtcTimestamps = append(spt.result.CreatedUtcTimestamps, partitionInfo.createdUtcTimestamp)
|
|
|
|
spt.result.InMemoryPercentages = append(spt.result.InMemoryPercentages, resp.InMemoryPercentages[offset])
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
spt.result = respFromRootCoord
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2020-11-19 17:09:22 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (spt *showPartitionsTask) PostExecute(ctx context.Context) error {
|
2020-11-19 17:09:22 +08:00
|
|
|
return nil
|
|
|
|
}
|
2020-12-22 15:39:10 +08:00
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
type flushTask struct {
|
2021-02-02 10:58:39 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.FlushRequest
|
2021-06-21 18:22:13 +08:00
|
|
|
ctx context.Context
|
|
|
|
dataCoord types.DataCoord
|
2021-06-23 16:56:11 +08:00
|
|
|
result *milvuspb.FlushResponse
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return ft.ctx
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) ID() UniqueID {
|
2021-02-02 10:58:39 +08:00
|
|
|
return ft.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) SetID(uid UniqueID) {
|
2021-02-02 10:58:39 +08:00
|
|
|
ft.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return FlushTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) Type() commonpb.MsgType {
|
2021-02-02 10:58:39 +08:00
|
|
|
return ft.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) BeginTs() Timestamp {
|
2021-02-02 10:58:39 +08:00
|
|
|
return ft.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) EndTs() Timestamp {
|
2021-02-02 10:58:39 +08:00
|
|
|
return ft.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) SetTs(ts Timestamp) {
|
2021-02-02 10:58:39 +08:00
|
|
|
ft.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
ft.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
ft.Base.MsgType = commonpb.MsgType_Flush
|
2022-04-24 22:03:44 +08:00
|
|
|
ft.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-02 10:58:39 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) Execute(ctx context.Context) error {
|
2021-06-23 16:56:11 +08:00
|
|
|
coll2Segments := make(map[string]*schemapb.LongArray)
|
2022-09-09 09:58:37 +08:00
|
|
|
flushColl2Segments := make(map[string]*schemapb.LongArray)
|
|
|
|
coll2SealTimes := make(map[string]int64)
|
2021-02-03 17:30:10 +08:00
|
|
|
for _, collName := range ft.CollectionNames {
|
2021-02-26 17:44:24 +08:00
|
|
|
collID, err := globalMetaCache.GetCollectionID(ctx, collName)
|
2021-02-03 17:30:10 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
flushReq := &datapb.FlushRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_Flush,
|
2021-02-03 17:30:10 +08:00
|
|
|
MsgID: ft.Base.MsgID,
|
|
|
|
Timestamp: ft.Base.Timestamp,
|
|
|
|
SourceID: ft.Base.SourceID,
|
|
|
|
},
|
|
|
|
DbID: 0,
|
|
|
|
CollectionID: collID,
|
|
|
|
}
|
2021-06-23 16:56:11 +08:00
|
|
|
resp, err := ft.dataCoord.Flush(ctx, flushReq)
|
|
|
|
if err != nil {
|
2021-11-29 12:25:17 +08:00
|
|
|
return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error())
|
2021-02-03 17:30:10 +08:00
|
|
|
}
|
2021-06-23 16:56:11 +08:00
|
|
|
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(resp.Status.Reason)
|
2021-02-03 17:30:10 +08:00
|
|
|
}
|
2021-06-23 16:56:11 +08:00
|
|
|
coll2Segments[collName] = &schemapb.LongArray{Data: resp.GetSegmentIDs()}
|
2022-09-09 09:58:37 +08:00
|
|
|
flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()}
|
|
|
|
coll2SealTimes[collName] = resp.GetTimeOfSeal()
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
2021-06-23 16:56:11 +08:00
|
|
|
ft.result = &milvuspb.FlushResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
Reason: "",
|
|
|
|
},
|
2022-09-09 09:58:37 +08:00
|
|
|
DbName: "",
|
|
|
|
CollSegIDs: coll2Segments,
|
|
|
|
FlushCollSegIDs: flushColl2Segments,
|
|
|
|
CollSealTimes: coll2SealTimes,
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
2021-02-03 17:30:10 +08:00
|
|
|
return nil
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
func (ft *flushTask) PostExecute(ctx context.Context) error {
|
2021-02-02 10:58:39 +08:00
|
|
|
return nil
|
|
|
|
}
|
2021-02-04 15:31:02 +08:00
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type loadCollectionTask struct {
|
2021-02-04 15:31:02 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.LoadCollectionRequest
|
2021-06-22 16:44:09 +08:00
|
|
|
ctx context.Context
|
|
|
|
queryCoord types.QueryCoord
|
2022-10-17 18:01:25 +08:00
|
|
|
indexCoord types.IndexCoord
|
2021-06-22 16:44:09 +08:00
|
|
|
result *commonpb.Status
|
2022-03-02 16:23:55 +08:00
|
|
|
|
|
|
|
collectionID UniqueID
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return lct.ctx
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) ID() UniqueID {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lct.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) SetID(uid UniqueID) {
|
2021-02-04 15:31:02 +08:00
|
|
|
lct.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return LoadCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) Type() commonpb.MsgType {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lct.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) BeginTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lct.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) EndTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lct.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) SetTs(ts Timestamp) {
|
2021-02-04 15:31:02 +08:00
|
|
|
lct.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
lct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) PreExecute(ctx context.Context) error {
|
2021-12-13 10:01:18 +08:00
|
|
|
log.Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole), zap.Int64("msgID", lct.Base.MsgID))
|
2021-03-10 14:45:35 +08:00
|
|
|
lct.Base.MsgType = commonpb.MsgType_LoadCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
lct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-04 15:31:02 +08:00
|
|
|
|
|
|
|
collName := lct.CollectionName
|
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-06-21 17:26:15 +08:00
|
|
|
// To compat with LoadCollcetion before Milvus@2.1
|
|
|
|
if lct.ReplicaNumber == 0 {
|
|
|
|
lct.ReplicaNumber = 1
|
|
|
|
}
|
|
|
|
|
2021-02-04 15:31:02 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
|
2021-12-13 10:01:18 +08:00
|
|
|
log.Debug("loadCollectionTask Execute", zap.String("role", typeutil.ProxyRole), zap.Int64("msgID", lct.Base.MsgID))
|
2021-02-26 17:44:24 +08:00
|
|
|
collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-07-05 21:16:20 +08:00
|
|
|
|
2022-03-02 16:23:55 +08:00
|
|
|
lct.collectionID = collID
|
2021-02-26 17:44:24 +08:00
|
|
|
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lct.CollectionName)
|
2021-02-06 21:17:18 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-10-17 18:01:25 +08:00
|
|
|
// check index
|
|
|
|
indexResponse, err := lct.indexCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
|
|
|
CollectionID: collID,
|
|
|
|
IndexName: "",
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(indexResponse.Status.Reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
hasVecIndex := false
|
|
|
|
fieldIndexIDs := make(map[int64]int64)
|
|
|
|
for _, index := range indexResponse.IndexInfos {
|
|
|
|
fieldIndexIDs[index.FieldID] = index.IndexID
|
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
if index.FieldID == field.FieldID && (field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector) {
|
|
|
|
hasVecIndex = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !hasVecIndex {
|
|
|
|
errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", lct.LoadCollectionRequest.CollectionName)
|
|
|
|
log.Ctx(ctx).Error(errMsg)
|
|
|
|
return errors.New(errMsg)
|
|
|
|
}
|
2021-02-04 15:31:02 +08:00
|
|
|
request := &querypb.LoadCollectionRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_LoadCollection,
|
2021-02-04 15:31:02 +08:00
|
|
|
MsgID: lct.Base.MsgID,
|
|
|
|
Timestamp: lct.Base.Timestamp,
|
|
|
|
SourceID: lct.Base.SourceID,
|
|
|
|
},
|
2022-04-20 16:15:41 +08:00
|
|
|
DbID: 0,
|
|
|
|
CollectionID: collID,
|
|
|
|
Schema: collSchema,
|
|
|
|
ReplicaNumber: lct.ReplicaNumber,
|
2022-10-17 18:01:25 +08:00
|
|
|
FieldIndexID: fieldIndexIDs,
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
2021-12-13 10:01:18 +08:00
|
|
|
log.Debug("send LoadCollectionRequest to query coordinator", zap.String("role", typeutil.ProxyRole),
|
|
|
|
zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID),
|
2021-03-13 11:59:24 +08:00
|
|
|
zap.Any("schema", request.Schema))
|
2021-06-22 16:44:09 +08:00
|
|
|
lct.result, err = lct.queryCoord.LoadCollection(ctx, request)
|
2021-03-13 11:59:24 +08:00
|
|
|
if err != nil {
|
2021-06-22 16:44:09 +08:00
|
|
|
return fmt.Errorf("call query coordinator LoadCollection: %s", err)
|
2021-03-13 11:59:24 +08:00
|
|
|
}
|
|
|
|
return nil
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lct *loadCollectionTask) PostExecute(ctx context.Context) error {
|
2021-12-13 10:01:18 +08:00
|
|
|
log.Debug("loadCollectionTask PostExecute", zap.String("role", typeutil.ProxyRole),
|
|
|
|
zap.Int64("msgID", lct.Base.MsgID))
|
2021-02-04 15:31:02 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type releaseCollectionTask struct {
|
2021-02-04 15:31:02 +08:00
|
|
|
Condition
|
|
|
|
*milvuspb.ReleaseCollectionRequest
|
2021-06-22 16:44:09 +08:00
|
|
|
ctx context.Context
|
|
|
|
queryCoord types.QueryCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
chMgr channelsMgr
|
2022-03-02 16:23:55 +08:00
|
|
|
|
|
|
|
collectionID UniqueID
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return rct.ctx
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) ID() UniqueID {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rct.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) SetID(uid UniqueID) {
|
2021-02-04 15:31:02 +08:00
|
|
|
rct.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return ReleaseCollectionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) Type() commonpb.MsgType {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rct.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) BeginTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rct.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) EndTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rct.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) SetTs(ts Timestamp) {
|
2021-02-04 15:31:02 +08:00
|
|
|
rct.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
rct.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
rct.Base.MsgType = commonpb.MsgType_ReleaseCollection
|
2022-04-24 22:03:44 +08:00
|
|
|
rct.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-04 15:31:02 +08:00
|
|
|
|
|
|
|
collName := rct.CollectionName
|
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) Execute(ctx context.Context) (err error) {
|
2021-02-26 17:44:24 +08:00
|
|
|
collID, err := globalMetaCache.GetCollectionID(ctx, rct.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-03-02 16:23:55 +08:00
|
|
|
rct.collectionID = collID
|
2021-02-04 15:31:02 +08:00
|
|
|
request := &querypb.ReleaseCollectionRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_ReleaseCollection,
|
2021-02-04 15:31:02 +08:00
|
|
|
MsgID: rct.Base.MsgID,
|
|
|
|
Timestamp: rct.Base.Timestamp,
|
|
|
|
SourceID: rct.Base.SourceID,
|
|
|
|
},
|
|
|
|
DbID: 0,
|
|
|
|
CollectionID: collID,
|
|
|
|
}
|
2021-06-18 10:33:58 +08:00
|
|
|
|
2021-06-22 16:44:09 +08:00
|
|
|
rct.result, err = rct.queryCoord.ReleaseCollection(ctx, request)
|
2021-06-18 10:33:58 +08:00
|
|
|
|
2022-05-19 10:13:56 +08:00
|
|
|
globalMetaCache.RemoveCollection(ctx, rct.CollectionName)
|
2021-06-18 10:33:58 +08:00
|
|
|
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rct *releaseCollectionTask) PostExecute(ctx context.Context) error {
|
2022-05-17 11:11:56 +08:00
|
|
|
globalMetaCache.ClearShards(rct.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type loadPartitionsTask struct {
|
2021-02-04 15:31:02 +08:00
|
|
|
Condition
|
2021-03-12 14:22:09 +08:00
|
|
|
*milvuspb.LoadPartitionsRequest
|
2021-06-22 16:44:09 +08:00
|
|
|
ctx context.Context
|
|
|
|
queryCoord types.QueryCoord
|
2022-10-17 18:01:25 +08:00
|
|
|
indexCoord types.IndexCoord
|
2021-06-22 16:44:09 +08:00
|
|
|
result *commonpb.Status
|
2022-03-02 16:23:55 +08:00
|
|
|
|
|
|
|
collectionID UniqueID
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) TraceCtx() context.Context {
|
2021-03-25 14:41:46 +08:00
|
|
|
return lpt.ctx
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) ID() UniqueID {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lpt.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) SetID(uid UniqueID) {
|
2021-02-04 15:31:02 +08:00
|
|
|
lpt.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return LoadPartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) Type() commonpb.MsgType {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lpt.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) BeginTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lpt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) EndTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return lpt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) SetTs(ts Timestamp) {
|
2021-02-04 15:31:02 +08:00
|
|
|
lpt.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
lpt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
lpt.Base.MsgType = commonpb.MsgType_LoadPartitions
|
2022-04-24 22:03:44 +08:00
|
|
|
lpt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-04 15:31:02 +08:00
|
|
|
|
|
|
|
collName := lpt.CollectionName
|
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) Execute(ctx context.Context) error {
|
2021-02-04 15:31:02 +08:00
|
|
|
var partitionIDs []int64
|
2021-02-26 17:44:24 +08:00
|
|
|
collID, err := globalMetaCache.GetCollectionID(ctx, lpt.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-03-02 16:23:55 +08:00
|
|
|
lpt.collectionID = collID
|
2021-02-26 17:44:24 +08:00
|
|
|
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lpt.CollectionName)
|
2021-02-06 21:17:18 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-10-17 18:01:25 +08:00
|
|
|
// check index
|
|
|
|
indexResponse, err := lpt.indexCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
|
|
|
CollectionID: collID,
|
|
|
|
IndexName: "",
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(indexResponse.Status.Reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
hasVecIndex := false
|
|
|
|
fieldIndexIDs := make(map[int64]int64)
|
|
|
|
for _, index := range indexResponse.IndexInfos {
|
|
|
|
fieldIndexIDs[index.FieldID] = index.IndexID
|
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
if index.FieldID == field.FieldID && (field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector) {
|
|
|
|
hasVecIndex = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !hasVecIndex {
|
|
|
|
errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", lpt.LoadPartitionsRequest.CollectionName)
|
|
|
|
log.Ctx(ctx).Error(errMsg)
|
|
|
|
return errors.New(errMsg)
|
|
|
|
}
|
2021-02-04 15:31:02 +08:00
|
|
|
for _, partitionName := range lpt.PartitionNames {
|
2021-02-26 17:44:24 +08:00
|
|
|
partitionID, err := globalMetaCache.GetPartitionID(ctx, lpt.CollectionName, partitionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
partitionIDs = append(partitionIDs, partitionID)
|
|
|
|
}
|
2021-03-12 14:22:09 +08:00
|
|
|
request := &querypb.LoadPartitionsRequest{
|
2021-02-04 15:31:02 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_LoadPartitions,
|
2021-02-04 15:31:02 +08:00
|
|
|
MsgID: lpt.Base.MsgID,
|
|
|
|
Timestamp: lpt.Base.Timestamp,
|
|
|
|
SourceID: lpt.Base.SourceID,
|
|
|
|
},
|
2022-04-20 16:15:41 +08:00
|
|
|
DbID: 0,
|
|
|
|
CollectionID: collID,
|
|
|
|
PartitionIDs: partitionIDs,
|
|
|
|
Schema: collSchema,
|
|
|
|
ReplicaNumber: lpt.ReplicaNumber,
|
2022-10-17 18:01:25 +08:00
|
|
|
FieldIndexID: fieldIndexIDs,
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
2021-06-22 16:44:09 +08:00
|
|
|
lpt.result, err = lpt.queryCoord.LoadPartitions(ctx, request)
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (lpt *loadPartitionsTask) PostExecute(ctx context.Context) error {
|
2021-02-04 15:31:02 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
type releasePartitionsTask struct {
|
2021-02-04 15:31:02 +08:00
|
|
|
Condition
|
2021-03-12 14:22:09 +08:00
|
|
|
*milvuspb.ReleasePartitionsRequest
|
2021-06-22 16:44:09 +08:00
|
|
|
ctx context.Context
|
|
|
|
queryCoord types.QueryCoord
|
|
|
|
result *commonpb.Status
|
2022-03-02 16:23:55 +08:00
|
|
|
|
|
|
|
collectionID UniqueID
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) TraceCtx() context.Context {
|
2021-02-23 09:58:06 +08:00
|
|
|
return rpt.ctx
|
2021-02-04 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) ID() UniqueID {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rpt.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) SetID(uid UniqueID) {
|
2021-02-04 15:31:02 +08:00
|
|
|
rpt.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) Type() commonpb.MsgType {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rpt.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) Name() string {
|
2021-02-23 09:58:06 +08:00
|
|
|
return ReleasePartitionTaskName
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) BeginTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rpt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) EndTs() Timestamp {
|
2021-02-04 15:31:02 +08:00
|
|
|
return rpt.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) SetTs(ts Timestamp) {
|
2021-02-04 15:31:02 +08:00
|
|
|
rpt.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) OnEnqueue() error {
|
2021-02-23 09:58:06 +08:00
|
|
|
rpt.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
2021-03-10 14:45:35 +08:00
|
|
|
rpt.Base.MsgType = commonpb.MsgType_ReleasePartitions
|
2022-04-24 22:03:44 +08:00
|
|
|
rpt.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-02-04 15:31:02 +08:00
|
|
|
|
|
|
|
collName := rpt.CollectionName
|
|
|
|
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) Execute(ctx context.Context) (err error) {
|
2021-02-04 15:31:02 +08:00
|
|
|
var partitionIDs []int64
|
2021-02-26 17:44:24 +08:00
|
|
|
collID, err := globalMetaCache.GetCollectionID(ctx, rpt.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-03-02 16:23:55 +08:00
|
|
|
rpt.collectionID = collID
|
2021-02-04 15:31:02 +08:00
|
|
|
for _, partitionName := range rpt.PartitionNames {
|
2021-02-26 17:44:24 +08:00
|
|
|
partitionID, err := globalMetaCache.GetPartitionID(ctx, rpt.CollectionName, partitionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
partitionIDs = append(partitionIDs, partitionID)
|
|
|
|
}
|
2021-03-12 14:22:09 +08:00
|
|
|
request := &querypb.ReleasePartitionsRequest{
|
2021-02-04 15:31:02 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_ReleasePartitions,
|
2021-02-04 15:31:02 +08:00
|
|
|
MsgID: rpt.Base.MsgID,
|
|
|
|
Timestamp: rpt.Base.Timestamp,
|
|
|
|
SourceID: rpt.Base.SourceID,
|
|
|
|
},
|
|
|
|
DbID: 0,
|
|
|
|
CollectionID: collID,
|
|
|
|
PartitionIDs: partitionIDs,
|
|
|
|
}
|
2021-06-22 16:44:09 +08:00
|
|
|
rpt.result, err = rpt.queryCoord.ReleasePartitions(ctx, request)
|
2021-02-04 15:31:02 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-09-09 19:02:08 +08:00
|
|
|
func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error {
|
2022-05-17 11:11:56 +08:00
|
|
|
globalMetaCache.ClearShards(rpt.CollectionName)
|
2021-02-04 15:31:02 +08:00
|
|
|
return nil
|
|
|
|
}
|
2021-08-26 12:15:52 +08:00
|
|
|
|
2021-12-28 22:22:46 +08:00
|
|
|
// CreateAliasTask contains task information of CreateAlias
|
2021-09-18 11:13:51 +08:00
|
|
|
type CreateAliasTask struct {
|
|
|
|
Condition
|
|
|
|
*milvuspb.CreateAliasRequest
|
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
}
|
|
|
|
|
2021-11-17 16:13:17 +08:00
|
|
|
// TraceCtx returns the trace context of the task.
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) TraceCtx() context.Context {
|
|
|
|
return c.ctx
|
|
|
|
}
|
|
|
|
|
2021-11-17 16:13:17 +08:00
|
|
|
// ID return the id of the task
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) ID() UniqueID {
|
|
|
|
return c.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2021-11-17 16:13:17 +08:00
|
|
|
// SetID sets the id of the task
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) SetID(uid UniqueID) {
|
|
|
|
c.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2021-11-17 16:13:17 +08:00
|
|
|
// Name returns the name of the task
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) Name() string {
|
|
|
|
return CreateAliasTaskName
|
|
|
|
}
|
|
|
|
|
2021-11-17 16:13:17 +08:00
|
|
|
// Type returns the type of the task
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) Type() commonpb.MsgType {
|
|
|
|
return c.Base.MsgType
|
|
|
|
}
|
|
|
|
|
2021-12-23 11:05:09 +08:00
|
|
|
// BeginTs returns the ts
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) BeginTs() Timestamp {
|
|
|
|
return c.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-12-23 11:05:09 +08:00
|
|
|
// EndTs returns the ts
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) EndTs() Timestamp {
|
|
|
|
return c.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
2021-12-23 11:05:09 +08:00
|
|
|
// SetTs sets the ts
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) SetTs(ts Timestamp) {
|
|
|
|
c.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
2021-12-29 22:40:27 +08:00
|
|
|
// OnEnqueue defines the behavior task enqueued
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) OnEnqueue() error {
|
|
|
|
c.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-29 22:40:27 +08:00
|
|
|
// PreExecute defines the action before task execution
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) PreExecute(ctx context.Context) error {
|
|
|
|
c.Base.MsgType = commonpb.MsgType_CreateAlias
|
2022-04-24 22:03:44 +08:00
|
|
|
c.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-09-18 11:13:51 +08:00
|
|
|
|
|
|
|
collAlias := c.Alias
|
|
|
|
// collection alias uses the same format as collection name
|
|
|
|
if err := ValidateCollectionAlias(collAlias); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
collName := c.CollectionName
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-09-18 11:13:51 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-29 22:40:27 +08:00
|
|
|
// Execute defines the actual execution of create alias
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
c.result, err = c.rootCoord.CreateAlias(ctx, c.CreateAliasRequest)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-12-29 22:40:27 +08:00
|
|
|
// PostExecute defines the post execution, do nothing for create alias
|
2021-09-18 11:13:51 +08:00
|
|
|
func (c *CreateAliasTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-17 18:46:38 +08:00
|
|
|
// DropAliasTask is the task to drop alias
|
2021-09-18 11:13:51 +08:00
|
|
|
type DropAliasTask struct {
|
|
|
|
Condition
|
|
|
|
*milvuspb.DropAliasRequest
|
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
}
|
|
|
|
|
2022-01-10 18:33:46 +08:00
|
|
|
// TraceCtx returns the context for trace
|
2021-09-18 11:13:51 +08:00
|
|
|
func (d *DropAliasTask) TraceCtx() context.Context {
|
|
|
|
return d.ctx
|
|
|
|
}
|
|
|
|
|
2022-01-10 18:33:46 +08:00
|
|
|
// ID returns the MsgID
|
2021-09-18 11:13:51 +08:00
|
|
|
func (d *DropAliasTask) ID() UniqueID {
|
|
|
|
return d.Base.MsgID
|
|
|
|
}
|
|
|
|
|
2022-01-10 18:33:46 +08:00
|
|
|
// SetID sets the MsgID
|
2021-09-18 11:13:51 +08:00
|
|
|
func (d *DropAliasTask) SetID(uid UniqueID) {
|
|
|
|
d.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
2022-01-10 18:33:46 +08:00
|
|
|
// Name returns the name of the task
|
2021-09-18 11:13:51 +08:00
|
|
|
func (d *DropAliasTask) Name() string {
|
|
|
|
return DropAliasTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) Type() commonpb.MsgType {
|
|
|
|
return d.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) BeginTs() Timestamp {
|
|
|
|
return d.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) EndTs() Timestamp {
|
|
|
|
return d.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) SetTs(ts Timestamp) {
|
|
|
|
d.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) OnEnqueue() error {
|
|
|
|
d.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) PreExecute(ctx context.Context) error {
|
|
|
|
d.Base.MsgType = commonpb.MsgType_DropAlias
|
2022-04-24 22:03:44 +08:00
|
|
|
d.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-09-18 11:13:51 +08:00
|
|
|
collAlias := d.Alias
|
|
|
|
if err := ValidateCollectionAlias(collAlias); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
d.result, err = d.rootCoord.DropAlias(ctx, d.DropAliasRequest)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DropAliasTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-24 13:33:45 +08:00
|
|
|
// AlterAliasTask is the task to alter alias
|
2021-09-18 11:13:51 +08:00
|
|
|
type AlterAliasTask struct {
|
|
|
|
Condition
|
|
|
|
*milvuspb.AlterAliasRequest
|
|
|
|
ctx context.Context
|
|
|
|
rootCoord types.RootCoord
|
|
|
|
result *commonpb.Status
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) TraceCtx() context.Context {
|
|
|
|
return a.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) ID() UniqueID {
|
|
|
|
return a.Base.MsgID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) SetID(uid UniqueID) {
|
|
|
|
a.Base.MsgID = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) Name() string {
|
|
|
|
return AlterAliasTaskName
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) Type() commonpb.MsgType {
|
|
|
|
return a.Base.MsgType
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) BeginTs() Timestamp {
|
|
|
|
return a.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) EndTs() Timestamp {
|
|
|
|
return a.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) SetTs(ts Timestamp) {
|
|
|
|
a.Base.Timestamp = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) OnEnqueue() error {
|
|
|
|
a.Base = &commonpb.MsgBase{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) PreExecute(ctx context.Context) error {
|
|
|
|
a.Base.MsgType = commonpb.MsgType_AlterAlias
|
2022-04-24 22:03:44 +08:00
|
|
|
a.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
2021-09-18 11:13:51 +08:00
|
|
|
|
|
|
|
collAlias := a.Alias
|
|
|
|
// collection alias uses the same format as collection name
|
|
|
|
if err := ValidateCollectionAlias(collAlias); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
collName := a.CollectionName
|
2021-10-23 10:53:12 +08:00
|
|
|
if err := validateCollectionName(collName); err != nil {
|
2021-09-18 11:13:51 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) Execute(ctx context.Context) error {
|
|
|
|
var err error
|
|
|
|
a.result, err = a.rootCoord.AlterAlias(ctx, a.AlterAliasRequest)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *AlterAliasTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|