2021-04-19 11:12:56 +08:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-06-18 21:30:08 +08:00
package rootcoord
2021-01-19 14:44:03 +08:00
import (
2021-03-13 14:42:53 +08:00
"context"
2021-02-07 17:02:13 +08:00
"fmt"
2021-02-05 14:09:55 +08:00
2021-01-19 14:44:03 +08:00
"github.com/golang/protobuf/proto"
2021-09-14 11:59:47 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-05-26 20:14:30 +08:00
"github.com/milvus-io/milvus/internal/proto/proxypb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-08-18 14:36:10 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-09-14 11:59:47 +08:00
"go.uber.org/zap"
2021-01-19 14:44:03 +08:00
)
type reqTask interface {
2021-03-13 14:42:53 +08:00
Ctx ( ) context . Context
2021-01-19 14:44:03 +08:00
Type ( ) commonpb . MsgType
2021-03-13 14:42:53 +08:00
Execute ( ctx context . Context ) error
2021-06-26 09:22:11 +08:00
Core ( ) * Core
2021-01-19 14:44:03 +08:00
}
type baseReqTask struct {
2021-03-13 14:42:53 +08:00
ctx context . Context
2021-01-19 14:44:03 +08:00
core * Core
}
2021-06-26 09:22:11 +08:00
func ( b * baseReqTask ) Core ( ) * Core {
return b . core
2021-01-19 14:44:03 +08:00
}
2021-06-26 09:22:11 +08:00
func ( b * baseReqTask ) Ctx ( ) context . Context {
return b . ctx
}
func executeTask ( t reqTask ) error {
errChan := make ( chan error )
go func ( ) {
err := t . Execute ( t . Ctx ( ) )
errChan <- err
} ( )
2021-01-19 14:44:03 +08:00
select {
2021-06-26 09:22:11 +08:00
case <- t . Core ( ) . ctx . Done ( ) :
return fmt . Errorf ( "context canceled" )
case <- t . Ctx ( ) . Done ( ) :
return fmt . Errorf ( "context canceled" )
case err := <- errChan :
2021-07-14 17:11:54 +08:00
if t . Core ( ) . ctx . Err ( ) != nil || t . Ctx ( ) . Err ( ) != nil {
return fmt . Errorf ( "context canceled" )
}
2021-01-19 14:44:03 +08:00
return err
}
}
type CreateCollectionReqTask struct {
baseReqTask
Req * milvuspb . CreateCollectionRequest
}
func ( t * CreateCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-01-23 10:12:41 +08:00
2021-03-13 14:42:53 +08:00
func ( t * CreateCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreateCollection {
return fmt . Errorf ( "create collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-01-19 14:44:03 +08:00
var schema schemapb . CollectionSchema
err := proto . Unmarshal ( t . Req . Schema , & schema )
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "unmarshal schema error= %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-01-23 10:12:41 +08:00
if t . Req . CollectionName != schema . Name {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "collection name = %s, schema.Name=%s" , t . Req . CollectionName , schema . Name )
2021-01-23 10:12:41 +08:00
}
2021-05-17 19:15:01 +08:00
if t . Req . ShardsNum <= 0 {
2021-09-14 11:59:47 +08:00
t . Req . ShardsNum = common . DefaultShardsNum
2021-05-17 19:15:01 +08:00
}
2021-09-08 15:00:00 +08:00
log . Debug ( "CreateCollectionReqTask Execute" , zap . Any ( "CollectionName" , t . Req . CollectionName ) ,
zap . Any ( "ShardsNum" , t . Req . ShardsNum ) )
2021-05-17 19:15:01 +08:00
2021-01-19 14:44:03 +08:00
for idx , field := range schema . Fields {
field . FieldID = int64 ( idx + StartOfUserFieldID )
}
rowIDField := & schemapb . FieldSchema {
FieldID : int64 ( RowIDField ) ,
Name : RowIDFieldName ,
IsPrimaryKey : false ,
Description : "row id" ,
2021-03-12 14:22:09 +08:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 14:44:03 +08:00
}
timeStampField := & schemapb . FieldSchema {
FieldID : int64 ( TimeStampField ) ,
Name : TimeStampFieldName ,
IsPrimaryKey : false ,
Description : "time stamp" ,
2021-03-12 14:22:09 +08:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 14:44:03 +08:00
}
schema . Fields = append ( schema . Fields , rowIDField , timeStampField )
2021-05-20 14:14:14 +08:00
collID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "alloc collection id error = %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-05-20 14:14:14 +08:00
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "alloc partition id error = %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-05-17 19:15:01 +08:00
2021-06-19 11:34:08 +08:00
log . Debug ( "collection name -> id" ,
zap . String ( "collection name" , t . Req . CollectionName ) ,
2021-07-06 09:16:03 +08:00
zap . Int64 ( "collection_id" , collID ) ,
2021-06-19 11:34:08 +08:00
zap . Int64 ( "default partition id" , partID ) )
2021-05-17 19:15:01 +08:00
vchanNames := make ( [ ] string , t . Req . ShardsNum )
chanNames := make ( [ ] string , t . Req . ShardsNum )
for i := int32 ( 0 ) ; i < t . Req . ShardsNum ; i ++ {
2021-09-18 09:13:50 +08:00
vchanNames [ i ] = fmt . Sprintf ( "%s_%dv%d" , t . core . dmlChannels . GetDmlMsgStreamName ( ) , collID , i )
2021-06-19 14:18:08 +08:00
chanNames [ i ] = ToPhysicalChannel ( vchanNames [ i ] )
2021-05-17 19:15:01 +08:00
}
2021-05-14 21:26:06 +08:00
collInfo := etcdpb . CollectionInfo {
2021-07-21 18:00:14 +08:00
ID : collID ,
Schema : & schema ,
PartitionIDs : [ ] typeutil . UniqueID { partID } ,
PartitionNames : [ ] string { Params . DefaultPartitionName } ,
FieldIndexes : make ( [ ] * etcdpb . FieldIndexInfo , 0 , 16 ) ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-09-08 15:00:00 +08:00
ShardsNum : t . Req . ShardsNum ,
2021-07-23 14:36:12 +08:00
PartitionCreatedTimestamps : [ ] uint64 { 0 } ,
2021-01-20 09:36:50 +08:00
}
2021-05-12 15:33:53 +08:00
2021-02-11 08:41:59 +08:00
idxInfo := make ( [ ] * etcdpb . IndexInfo , 0 , 16 )
2021-01-20 09:36:50 +08:00
2021-05-12 15:33:53 +08:00
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
2021-01-20 09:36:50 +08:00
schemaBytes , err := proto . Marshal ( & schema )
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "marshal schema error = %w" , err )
2021-01-20 09:36:50 +08:00
}
2021-05-14 21:26:06 +08:00
ddCollReq := internalpb . CreateCollectionRequest {
2021-05-17 19:15:01 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
2021-07-06 09:16:03 +08:00
PartitionName : Params . DefaultPartitionName ,
2021-05-17 19:15:01 +08:00
DbID : 0 , //TODO,not used
CollectionID : collID ,
2021-07-06 09:16:03 +08:00
PartitionID : partID ,
2021-05-17 19:15:01 +08:00
Schema : schemaBytes ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-01-20 09:36:50 +08:00
}
2021-05-14 21:26:06 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 14:14:14 +08:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 16:48:31 +08:00
ddCollReq . Base . Timestamp = ts
2021-07-06 09:16:03 +08:00
return EncodeDdOperation ( & ddCollReq , CreateCollectionDDType )
2021-02-03 20:04:29 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "create collection %d" , collID )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-05-12 15:33:53 +08:00
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
createCollectionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
2021-06-04 15:00:34 +08:00
2021-08-18 14:36:10 +08:00
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . AddCollection ( & collInfo , ts , idxInfo , ddOp )
if err != nil {
return fmt . Errorf ( "meta table add collection failed,error = %w" , err )
}
2021-05-14 21:26:06 +08:00
2021-08-18 14:36:10 +08:00
// add dml channel before send dd msg
t . core . dmlChannels . AddProducerChannels ( chanNames ... )
2021-05-20 14:14:14 +08:00
2021-08-18 14:36:10 +08:00
err = t . core . SendDdCreateCollectionReq ( ctx , & ddCollReq , chanNames )
if err != nil {
return fmt . Errorf ( "send dd create collection req failed, error = %w" , err )
}
t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
t . core . SendTimeTick ( ts , reason )
return nil
}
err = createCollectionFn ( )
2021-07-13 22:19:55 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return err
2021-07-13 22:19:55 +08:00
}
2021-08-18 14:36:10 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
type DropCollectionReqTask struct {
baseReqTask
Req * milvuspb . DropCollectionRequest
}
func ( t * DropCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * DropCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropCollection {
return fmt . Errorf ( "drop collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . DropCollectionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
DbID : 0 , //not used
CollectionID : collMeta . ID ,
}
2021-05-14 21:26:06 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 14:14:14 +08:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 16:48:31 +08:00
ddReq . Base . Timestamp = ts
2021-07-06 09:16:03 +08:00
return EncodeDdOperation ( & ddReq , DropCollectionDDType )
2021-05-14 21:26:06 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "drop collection %d" , collMeta . ID )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
// use lambda function here to guarantee all resources to be released
dropCollectionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . DeleteCollection ( collMeta . ID , ts , ddOp )
if err != nil {
return err
}
err = t . core . SendDdDropCollectionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames )
if err != nil {
return err
}
t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
t . core . SendTimeTick ( ts , reason )
2021-09-18 09:13:50 +08:00
// send tt into deleted channels to tell data_node to clear flowgragh
t . core . chanTimeTick . SendTimeTickToChannel ( collMeta . PhysicalChannelNames , ts )
2021-08-19 15:06:12 +08:00
2021-08-18 14:36:10 +08:00
// remove dml channel after send dd msg
t . core . dmlChannels . RemoveProducerChannels ( collMeta . PhysicalChannelNames ... )
return nil
2021-05-14 21:26:06 +08:00
}
2021-08-18 14:36:10 +08:00
err = dropCollectionFn ( )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-02-05 14:09:55 +08:00
//notify query service to release collection
2021-08-03 21:47:24 +08:00
if err = t . core . CallReleaseCollectionService ( t . core . ctx , ts , 0 , collMeta . ID ) ; err != nil {
log . Error ( "CallReleaseCollectionService failed" , zap . String ( "error" , err . Error ( ) ) )
return err
}
2021-02-05 14:09:55 +08:00
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 21:26:06 +08:00
// error doesn't matter here
2021-05-26 20:14:30 +08:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-12 15:33:53 +08:00
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
type HasCollectionReqTask struct {
baseReqTask
Req * milvuspb . HasCollectionRequest
HasCollection bool
}
func ( t * HasCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * HasCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasCollection {
return fmt . Errorf ( "has collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 17:12:17 +08:00
_ , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-01-19 14:44:03 +08:00
if err == nil {
t . HasCollection = true
} else {
t . HasCollection = false
}
return nil
}
type DescribeCollectionReqTask struct {
baseReqTask
Req * milvuspb . DescribeCollectionRequest
Rsp * milvuspb . DescribeCollectionResponse
}
func ( t * DescribeCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * DescribeCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DescribeCollection {
return fmt . Errorf ( "describe collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-17 19:15:01 +08:00
var collInfo * etcdpb . CollectionInfo
2021-02-25 16:08:56 +08:00
var err error
if t . Req . CollectionName != "" {
2021-05-18 17:12:17 +08:00
collInfo , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-02-25 16:08:56 +08:00
if err != nil {
return err
}
} else {
2021-05-18 17:12:17 +08:00
collInfo , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , t . Req . TimeStamp )
2021-02-25 16:08:56 +08:00
if err != nil {
return err
}
2021-01-19 14:44:03 +08:00
}
2021-02-25 16:08:56 +08:00
2021-05-17 19:15:01 +08:00
t . Rsp . Schema = proto . Clone ( collInfo . Schema ) . ( * schemapb . CollectionSchema )
t . Rsp . CollectionID = collInfo . ID
t . Rsp . VirtualChannelNames = collInfo . VirtualChannelNames
t . Rsp . PhysicalChannelNames = collInfo . PhysicalChannelNames
2021-09-08 15:00:00 +08:00
if collInfo . ShardsNum == 0 {
collInfo . ShardsNum = int32 ( len ( collInfo . VirtualChannelNames ) )
}
t . Rsp . ShardsNum = collInfo . ShardsNum
2021-07-21 18:00:14 +08:00
t . Rsp . CreatedTimestamp = collInfo . CreateTime
createdPhysicalTime , _ := tsoutil . ParseHybridTs ( collInfo . CreateTime )
t . Rsp . CreatedUtcTimestamp = createdPhysicalTime
2021-09-22 16:20:48 +08:00
t . Rsp . Aliases = t . core . MetaTable . ListAliases ( collInfo . ID )
2021-01-19 14:44:03 +08:00
return nil
}
type ShowCollectionReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowCollectionsRequest
Rsp * milvuspb . ShowCollectionsResponse
2021-01-19 14:44:03 +08:00
}
func ( t * ShowCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * ShowCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowCollections {
return fmt . Errorf ( "show collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 17:12:17 +08:00
coll , err := t . core . MetaTable . ListCollections ( t . Req . TimeStamp )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-21 18:00:14 +08:00
for name , meta := range coll {
2021-06-03 19:09:33 +08:00
t . Rsp . CollectionNames = append ( t . Rsp . CollectionNames , name )
2021-07-21 18:00:14 +08:00
t . Rsp . CollectionIds = append ( t . Rsp . CollectionIds , meta . ID )
t . Rsp . CreatedTimestamps = append ( t . Rsp . CreatedTimestamps , meta . CreateTime )
physical , _ := tsoutil . ParseHybridTs ( meta . CreateTime )
t . Rsp . CreatedUtcTimestamps = append ( t . Rsp . CreatedUtcTimestamps , physical )
2021-06-03 19:09:33 +08:00
}
2021-01-19 14:44:03 +08:00
return nil
}
type CreatePartitionReqTask struct {
baseReqTask
Req * milvuspb . CreatePartitionRequest
}
func ( t * CreatePartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * CreatePartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreatePartition {
return fmt . Errorf ( "create partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-05-20 14:14:14 +08:00
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . CreatePartitionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , // todo, not used
CollectionID : collMeta . ID ,
2021-05-14 21:26:06 +08:00
PartitionID : partID ,
2021-01-20 09:36:50 +08:00
}
2021-05-14 21:26:06 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 14:14:14 +08:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 16:48:31 +08:00
ddReq . Base . Timestamp = ts
2021-07-06 09:16:03 +08:00
return EncodeDdOperation ( & ddReq , CreatePartitionDDType )
2021-01-19 14:44:03 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "create partition %s" , t . Req . PartitionName )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-03-22 16:36:10 +08:00
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
createPartitionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . AddPartition ( collMeta . ID , t . Req . PartitionName , partID , ts , ddOp )
if err != nil {
return err
}
err = t . core . SendDdCreatePartitionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames )
if err != nil {
return err
}
t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
t . core . SendTimeTick ( ts , reason )
return nil
}
err = createPartitionFn ( )
2021-05-14 21:26:06 +08:00
if err != nil {
return err
}
2021-05-12 15:33:53 +08:00
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 21:26:06 +08:00
// error doesn't matter here
2021-05-26 20:14:30 +08:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
type DropPartitionReqTask struct {
baseReqTask
Req * milvuspb . DropPartitionRequest
}
func ( t * DropPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * DropPartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropPartition {
return fmt . Errorf ( "drop partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
collInfo , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-03 14:36:18 +08:00
partID , err := t . core . MetaTable . GetPartitionByName ( collInfo . ID , t . Req . PartitionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . DropPartitionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , //todo,not used
2021-05-14 21:26:06 +08:00
CollectionID : collInfo . ID ,
2021-07-03 14:36:18 +08:00
PartitionID : partID ,
2021-01-20 09:36:50 +08:00
}
2021-05-14 21:26:06 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 14:14:14 +08:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 16:48:31 +08:00
ddReq . Base . Timestamp = ts
2021-07-06 09:16:03 +08:00
return EncodeDdOperation ( & ddReq , DropPartitionDDType )
2021-01-19 14:44:03 +08:00
}
2021-03-22 16:36:10 +08:00
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "drop partition %s" , t . Req . PartitionName )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-05-12 15:33:53 +08:00
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
dropPartitionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
_ , err = t . core . MetaTable . DeletePartition ( collInfo . ID , t . Req . PartitionName , ts , ddOp )
if err != nil {
return err
}
err = t . core . SendDdDropPartitionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames )
if err != nil {
return err
}
t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
t . core . SendTimeTick ( ts , reason )
return nil
}
err = dropPartitionFn ( )
2021-05-14 21:26:06 +08:00
if err != nil {
return err
}
2021-05-12 15:33:53 +08:00
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 21:26:06 +08:00
// error doesn't matter here
2021-05-26 20:14:30 +08:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-14 21:26:06 +08:00
2021-06-22 16:08:08 +08:00
//notify query service to release partition
2021-08-03 21:47:24 +08:00
if err = t . core . CallReleasePartitionService ( t . core . ctx , ts , 0 , collInfo . ID , [ ] typeutil . UniqueID { partID } ) ; err != nil {
log . Error ( "CallReleaseCollectionService failed" , zap . String ( "error" , err . Error ( ) ) )
return err
}
2021-06-22 16:08:08 +08:00
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
type HasPartitionReqTask struct {
baseReqTask
Req * milvuspb . HasPartitionRequest
HasPartition bool
}
func ( t * HasPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * HasPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasPartition {
return fmt . Errorf ( "has partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-05-18 14:18:02 +08:00
t . HasPartition = t . core . MetaTable . HasPartition ( coll . ID , t . Req . PartitionName , 0 )
2021-01-19 14:44:03 +08:00
return nil
}
type ShowPartitionReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowPartitionsRequest
Rsp * milvuspb . ShowPartitionsResponse
2021-01-19 14:44:03 +08:00
}
func ( t * ShowPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * ShowPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowPartitions {
return fmt . Errorf ( "show partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-02-05 16:32:03 +08:00
var coll * etcdpb . CollectionInfo
var err error
if t . Req . CollectionName == "" {
2021-05-18 14:18:02 +08:00
coll , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-02-05 16:32:03 +08:00
} else {
2021-05-18 14:18:02 +08:00
coll , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-02-05 16:32:03 +08:00
}
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-03 14:36:18 +08:00
t . Rsp . PartitionIDs = coll . PartitionIDs
2021-07-06 09:16:03 +08:00
t . Rsp . PartitionNames = coll . PartitionNames
2021-07-21 18:00:14 +08:00
t . Rsp . CreatedTimestamps = coll . PartitionCreatedTimestamps
t . Rsp . CreatedUtcTimestamps = make ( [ ] uint64 , 0 , len ( coll . PartitionCreatedTimestamps ) )
for _ , ts := range coll . PartitionCreatedTimestamps {
physical , _ := tsoutil . ParseHybridTs ( ts )
t . Rsp . CreatedUtcTimestamps = append ( t . Rsp . CreatedUtcTimestamps , physical )
}
2021-07-03 14:36:18 +08:00
2021-01-19 14:44:03 +08:00
return nil
}
2021-01-21 10:01:29 +08:00
type DescribeSegmentReqTask struct {
baseReqTask
Req * milvuspb . DescribeSegmentRequest
Rsp * milvuspb . DescribeSegmentResponse //TODO,return repeated segment id in the future
}
func ( t * DescribeSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * DescribeSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeSegment {
return fmt . Errorf ( "describe segment, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
exist := false
2021-07-03 14:36:18 +08:00
segIDs , err := t . core . CallGetFlushedSegmentsService ( ctx , t . Req . CollectionID , - 1 )
if err != nil {
log . Debug ( "get flushed segment from data coord failed" , zap . String ( "collection_name" , coll . Schema . Name ) , zap . Error ( err ) )
exist = true
} else {
for _ , id := range segIDs {
if id == t . Req . SegmentID {
2021-01-21 10:01:29 +08:00
exist = true
break
}
}
}
2021-07-03 14:36:18 +08:00
2021-01-21 10:01:29 +08:00
if ! exist {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "segment id %d not belong to collection id %d" , t . Req . SegmentID , t . Req . CollectionID )
2021-01-21 10:01:29 +08:00
}
//TODO, get filed_id and index_name from request
segIdxInfo , err := t . core . MetaTable . GetSegmentIndexInfoByID ( t . Req . SegmentID , - 1 , "" )
2021-06-23 15:28:09 +08:00
log . Debug ( "RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID" , zap . Any ( "SegmentID" , t . Req . SegmentID ) ,
2021-06-06 09:41:35 +08:00
zap . Any ( "segIdxInfo" , segIdxInfo ) , zap . Error ( err ) )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
t . Rsp . IndexID = segIdxInfo . IndexID
2021-01-28 17:25:43 +08:00
t . Rsp . BuildID = segIdxInfo . BuildID
2021-03-08 15:46:51 +08:00
t . Rsp . EnableIndex = segIdxInfo . EnableIndex
2021-01-21 10:01:29 +08:00
return nil
}
type ShowSegmentReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowSegmentsRequest
Rsp * milvuspb . ShowSegmentsResponse
2021-01-21 10:01:29 +08:00
}
func ( t * ShowSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * ShowSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowSegments {
return fmt . Errorf ( "show segments, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
2021-02-22 16:40:26 +08:00
exist := false
2021-01-21 10:01:29 +08:00
for _ , partID := range coll . PartitionIDs {
2021-02-22 16:40:26 +08:00
if partID == t . Req . PartitionID {
exist = true
break
2021-01-21 10:01:29 +08:00
}
}
2021-02-22 16:40:26 +08:00
if ! exist {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "partition id = %d not belong to collection id = %d" , t . Req . PartitionID , t . Req . CollectionID )
2021-02-22 16:40:26 +08:00
}
2021-07-03 14:36:18 +08:00
segIDs , err := t . core . CallGetFlushedSegmentsService ( ctx , t . Req . CollectionID , t . Req . PartitionID )
2021-02-22 16:40:26 +08:00
if err != nil {
2021-07-03 14:36:18 +08:00
log . Debug ( "get flushed segments from data coord failed" , zap . String ( "collection name" , coll . Schema . Name ) , zap . Int64 ( "partition id" , t . Req . PartitionID ) , zap . Error ( err ) )
2021-02-22 16:40:26 +08:00
return err
}
2021-07-03 14:36:18 +08:00
t . Rsp . SegmentIDs = append ( t . Rsp . SegmentIDs , segIDs ... )
2021-01-21 10:01:29 +08:00
return nil
}
type CreateIndexReqTask struct {
baseReqTask
Req * milvuspb . CreateIndexRequest
}
func ( t * CreateIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * CreateIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreateIndex {
return fmt . Errorf ( "create index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-02-08 14:49:12 +08:00
indexName := Params . DefaultIndexName //TODO, get name from request
2021-05-20 14:14:14 +08:00
indexID , _ , err := t . core . IDAllocator ( 1 )
2021-06-23 15:28:09 +08:00
log . Debug ( "RootCoord CreateIndexReqTask" , zap . Any ( "indexID" , indexID ) , zap . Error ( err ) )
2021-02-09 13:11:55 +08:00
if err != nil {
return err
}
idxInfo := & etcdpb . IndexInfo {
IndexName : indexName ,
IndexID : indexID ,
IndexParams : t . Req . ExtraParams ,
}
2021-07-03 14:36:18 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
if err != nil {
return err
}
segID2PartID , err := t . core . getSegments ( ctx , collMeta . ID )
flushedSegs := make ( [ ] typeutil . UniqueID , 0 , len ( segID2PartID ) )
for k := range segID2PartID {
flushedSegs = append ( flushedSegs , k )
}
2021-01-21 10:01:29 +08:00
if err != nil {
2021-07-03 14:36:18 +08:00
log . Debug ( "get flushed segments from data coord failed" , zap . String ( "collection_name" , collMeta . Schema . Name ) , zap . Error ( err ) )
return err
}
2021-08-24 21:15:52 +08:00
segIDs , field , err := t . core . MetaTable . GetNotIndexedSegments ( t . Req . CollectionName , t . Req . FieldName , idxInfo , flushedSegs , t . Req . Base . GetTimestamp ( ) )
2021-07-03 14:36:18 +08:00
if err != nil {
log . Debug ( "RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments" , zap . Error ( err ) )
2021-01-21 10:01:29 +08:00
return err
}
2021-03-12 14:22:09 +08:00
if field . DataType != schemapb . DataType_FloatVector && field . DataType != schemapb . DataType_BinaryVector {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "field name = %s, data type = %s" , t . Req . FieldName , schemapb . DataType_name [ int32 ( field . DataType ) ] )
2021-01-22 15:41:54 +08:00
}
2021-05-24 14:19:52 +08:00
2021-05-15 18:08:08 +08:00
for _ , segID := range segIDs {
2021-05-24 14:19:52 +08:00
info := etcdpb . SegmentIndexInfo {
2021-07-03 20:00:40 +08:00
CollectionID : collMeta . ID ,
PartitionID : segID2PartID [ segID ] ,
SegmentID : segID ,
FieldID : field . FieldID ,
IndexID : idxInfo . IndexID ,
EnableIndex : false ,
2021-05-24 14:19:52 +08:00
}
2021-06-30 16:18:13 +08:00
info . BuildID , err = t . core . BuildIndex ( ctx , segID , & field , idxInfo , false )
2021-05-24 14:19:52 +08:00
if err != nil {
2021-05-15 18:08:08 +08:00
return err
2021-01-21 10:01:29 +08:00
}
2021-06-06 09:41:35 +08:00
if info . BuildID != 0 {
info . EnableIndex = true
}
2021-08-18 14:36:10 +08:00
ts , _ := t . core . TSOAllocator ( 1 )
if err := t . core . MetaTable . AddIndex ( & info , ts ) ; err != nil {
2021-07-03 14:36:18 +08:00
log . Debug ( "Add index into meta table failed" , zap . Int64 ( "collection_id" , collMeta . ID ) , zap . Int64 ( "index_id" , info . IndexID ) , zap . Int64 ( "build_id" , info . BuildID ) , zap . Error ( err ) )
}
2021-01-21 10:01:29 +08:00
}
2021-05-24 14:19:52 +08:00
2021-07-03 14:36:18 +08:00
return nil
2021-01-21 10:01:29 +08:00
}
type DescribeIndexReqTask struct {
baseReqTask
Req * milvuspb . DescribeIndexRequest
Rsp * milvuspb . DescribeIndexResponse
}
func ( t * DescribeIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 15:26:18 +08:00
func ( t * DescribeIndexReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeIndex {
return fmt . Errorf ( "describe index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-04-27 10:30:55 +08:00
coll , idx , err := t . core . MetaTable . GetIndexByName ( t . Req . CollectionName , t . Req . IndexName )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
for _ , i := range idx {
2021-04-27 10:30:55 +08:00
f , err := GetFieldSchemaByIndexID ( & coll , typeutil . UniqueID ( i . IndexID ) )
if err != nil {
log . Warn ( "get field schema by index id failed" , zap . String ( "collection name" , t . Req . CollectionName ) , zap . String ( "index name" , t . Req . IndexName ) , zap . Error ( err ) )
continue
}
2021-01-21 10:01:29 +08:00
desc := & milvuspb . IndexDescription {
IndexName : i . IndexName ,
Params : i . IndexParams ,
2021-02-08 14:20:29 +08:00
IndexID : i . IndexID ,
2021-04-27 10:30:55 +08:00
FieldName : f . Name ,
2021-01-21 10:01:29 +08:00
}
t . Rsp . IndexDescriptions = append ( t . Rsp . IndexDescriptions , desc )
}
return nil
}
2021-02-20 15:38:44 +08:00
type DropIndexReqTask struct {
baseReqTask
Req * milvuspb . DropIndexRequest
}
func ( t * DropIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 14:42:53 +08:00
func ( t * DropIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropIndex {
return fmt . Errorf ( "drop index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-07-03 20:00:40 +08:00
_ , info , err := t . core . MetaTable . GetIndexByName ( t . Req . CollectionName , t . Req . IndexName )
2021-02-20 15:38:44 +08:00
if err != nil {
2021-03-04 14:54:16 +08:00
log . Warn ( "GetIndexByName failed," , zap . String ( "collection name" , t . Req . CollectionName ) , zap . String ( "field name" , t . Req . FieldName ) , zap . String ( "index name" , t . Req . IndexName ) , zap . Error ( err ) )
2021-02-20 15:38:44 +08:00
return err
}
2021-03-04 14:54:16 +08:00
if len ( info ) == 0 {
return nil
2021-02-20 15:38:44 +08:00
}
2021-03-04 14:54:16 +08:00
if len ( info ) != 1 {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "len(index) = %d" , len ( info ) )
2021-03-04 14:54:16 +08:00
}
2021-05-25 14:03:06 +08:00
err = t . core . CallDropIndexService ( ctx , info [ 0 ] . IndexID )
2021-03-04 14:54:16 +08:00
if err != nil {
return err
}
2021-08-18 14:36:10 +08:00
ts , _ := t . core . TSOAllocator ( 1 )
_ , _ , err = t . core . MetaTable . DropIndex ( t . Req . CollectionName , t . Req . FieldName , t . Req . IndexName , ts )
2021-03-04 14:54:16 +08:00
return err
2021-02-20 15:38:44 +08:00
}
2021-09-18 11:13:51 +08:00
type CreateAliasReqTask struct {
baseReqTask
Req * milvuspb . CreateAliasRequest
}
func ( t * CreateAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
func ( t * CreateAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_CreateAlias {
return fmt . Errorf ( "create alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ddReq := internalpb . CreateAliasRequest {
Base : t . Req . Base ,
CollectionName : t . Req . CollectionName ,
Alias : t . Req . Alias ,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
ddReq . Base . Timestamp = ts
return EncodeDdOperation ( & ddReq , CreateAliasDDType )
}
reason := fmt . Sprintf ( "create alias %s" , t . Req . Alias )
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
// use lambda function here to guarantee all resources to be released
createAliasFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . AddAlias ( t . Req . Alias , t . Req . CollectionName , ts , ddOp )
if err != nil {
return err
}
return t . core . SendTimeTick ( ts , reason )
}
err = createAliasFn ( )
if err != nil {
return err
}
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
}
type DropAliasReqTask struct {
baseReqTask
Req * milvuspb . DropAliasRequest
}
func ( t * DropAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
func ( t * DropAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DropAlias {
return fmt . Errorf ( "create alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ddReq := internalpb . DropAliasRequest {
Base : t . Req . Base ,
Alias : t . Req . Alias ,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
ddReq . Base . Timestamp = ts
return EncodeDdOperation ( & ddReq , DropAliasDDType )
}
reason := fmt . Sprintf ( "create alias %s" , t . Req . Alias )
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
// use lambda function here to guarantee all resources to be released
dropAliasFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . DeleteAlias ( t . Req . Alias , ts , ddOp )
if err != nil {
return err
}
t . core . SendTimeTick ( ts , reason )
return nil
}
err = dropAliasFn ( )
if err != nil {
return err
}
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
CollectionName : t . Req . Alias ,
}
// error doesn't matter here
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
}
type AlterAliasReqTask struct {
baseReqTask
Req * milvuspb . AlterAliasRequest
}
func ( t * AlterAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
func ( t * AlterAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_AlterAlias {
return fmt . Errorf ( "alter alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ddReq := internalpb . DropAliasRequest {
Base : t . Req . Base ,
Alias : t . Req . Alias ,
}
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
ddReq . Base . Timestamp = ts
return EncodeDdOperation ( & ddReq , AlterAliasDDType )
}
reason := fmt . Sprintf ( "alter alias %s" , t . Req . Alias )
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
// use lambda function here to guarantee all resources to be released
alterAliasFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
t . core . chanTimeTick . AddDdlTimeTick ( ts , reason )
// clear ddl timetick in all conditions
defer t . core . chanTimeTick . RemoveDdlTimeTick ( ts , reason )
err = t . core . MetaTable . AlterAlias ( t . Req . Alias , t . Req . CollectionName , ts , ddOp )
if err != nil {
return err
}
t . core . SendTimeTick ( ts , reason )
return nil
}
err = alterAliasFn ( )
if err != nil {
return err
}
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
CollectionName : t . Req . Alias ,
}
// error doesn't matter here
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
}