2021-12-20 19:01:12 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 11:12:56 +08:00
// with the License. You may obtain a copy of the License at
//
2021-12-20 19:01:12 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 11:12:56 +08:00
//
2021-12-20 19:01:12 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 11:12:56 +08:00
2021-06-18 21:30:08 +08:00
package rootcoord
2021-01-19 14:44:03 +08:00
import (
2021-03-13 14:42:53 +08:00
"context"
2021-02-07 17:02:13 +08:00
"fmt"
2021-02-05 14:09:55 +08:00
2021-01-19 14:44:03 +08:00
"github.com/golang/protobuf/proto"
2021-09-14 11:59:47 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-08-18 14:36:10 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-09-14 11:59:47 +08:00
"go.uber.org/zap"
2021-01-19 14:44:03 +08:00
)
type reqTask interface {
2021-03-13 14:42:53 +08:00
Ctx ( ) context . Context
2021-01-19 14:44:03 +08:00
Type ( ) commonpb . MsgType
2021-03-13 14:42:53 +08:00
Execute ( ctx context . Context ) error
2021-06-26 09:22:11 +08:00
Core ( ) * Core
2021-01-19 14:44:03 +08:00
}
type baseReqTask struct {
2021-03-13 14:42:53 +08:00
ctx context . Context
2021-01-19 14:44:03 +08:00
core * Core
}
2021-06-26 09:22:11 +08:00
func ( b * baseReqTask ) Core ( ) * Core {
return b . core
2021-01-19 14:44:03 +08:00
}
2021-06-26 09:22:11 +08:00
func ( b * baseReqTask ) Ctx ( ) context . Context {
return b . ctx
}
func executeTask ( t reqTask ) error {
errChan := make ( chan error )
go func ( ) {
err := t . Execute ( t . Ctx ( ) )
errChan <- err
} ( )
2021-01-19 14:44:03 +08:00
select {
2021-06-26 09:22:11 +08:00
case <- t . Core ( ) . ctx . Done ( ) :
return fmt . Errorf ( "context canceled" )
case <- t . Ctx ( ) . Done ( ) :
return fmt . Errorf ( "context canceled" )
case err := <- errChan :
2021-07-14 17:11:54 +08:00
if t . Core ( ) . ctx . Err ( ) != nil || t . Ctx ( ) . Err ( ) != nil {
return fmt . Errorf ( "context canceled" )
}
2021-01-19 14:44:03 +08:00
return err
}
}
2021-09-23 15:10:00 +08:00
// CreateCollectionReqTask create collection request task
2021-01-19 14:44:03 +08:00
type CreateCollectionReqTask struct {
baseReqTask
Req * milvuspb . CreateCollectionRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * CreateCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-01-23 10:12:41 +08:00
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * CreateCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreateCollection {
return fmt . Errorf ( "create collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-01-19 14:44:03 +08:00
var schema schemapb . CollectionSchema
err := proto . Unmarshal ( t . Req . Schema , & schema )
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "unmarshal schema error= %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-01-23 10:12:41 +08:00
if t . Req . CollectionName != schema . Name {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "collection name = %s, schema.Name=%s" , t . Req . CollectionName , schema . Name )
2021-01-23 10:12:41 +08:00
}
2021-05-17 19:15:01 +08:00
if t . Req . ShardsNum <= 0 {
2021-09-14 11:59:47 +08:00
t . Req . ShardsNum = common . DefaultShardsNum
2021-05-17 19:15:01 +08:00
}
2021-09-08 15:00:00 +08:00
log . Debug ( "CreateCollectionReqTask Execute" , zap . Any ( "CollectionName" , t . Req . CollectionName ) ,
2021-12-21 19:49:02 +08:00
zap . Int32 ( "ShardsNum" , t . Req . ShardsNum ) ,
zap . String ( "ConsistencyLevel" , t . Req . ConsistencyLevel . String ( ) ) )
2021-05-17 19:15:01 +08:00
2021-01-19 14:44:03 +08:00
for idx , field := range schema . Fields {
field . FieldID = int64 ( idx + StartOfUserFieldID )
}
rowIDField := & schemapb . FieldSchema {
FieldID : int64 ( RowIDField ) ,
Name : RowIDFieldName ,
IsPrimaryKey : false ,
Description : "row id" ,
2021-03-12 14:22:09 +08:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 14:44:03 +08:00
}
timeStampField := & schemapb . FieldSchema {
FieldID : int64 ( TimeStampField ) ,
Name : TimeStampFieldName ,
IsPrimaryKey : false ,
Description : "time stamp" ,
2021-03-12 14:22:09 +08:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 14:44:03 +08:00
}
schema . Fields = append ( schema . Fields , rowIDField , timeStampField )
2021-05-20 14:14:14 +08:00
collID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "alloc collection id error = %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-05-20 14:14:14 +08:00
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "alloc partition id error = %w" , err )
2021-01-19 14:44:03 +08:00
}
2021-05-17 19:15:01 +08:00
2021-06-19 11:34:08 +08:00
log . Debug ( "collection name -> id" ,
zap . String ( "collection name" , t . Req . CollectionName ) ,
2021-07-06 09:16:03 +08:00
zap . Int64 ( "collection_id" , collID ) ,
2021-06-19 11:34:08 +08:00
zap . Int64 ( "default partition id" , partID ) )
2021-05-17 19:15:01 +08:00
vchanNames := make ( [ ] string , t . Req . ShardsNum )
chanNames := make ( [ ] string , t . Req . ShardsNum )
2021-11-03 21:04:14 +08:00
deltaChanNames := make ( [ ] string , t . Req . ShardsNum )
2021-05-17 19:15:01 +08:00
for i := int32 ( 0 ) ; i < t . Req . ShardsNum ; i ++ {
2021-11-25 10:07:15 +08:00
vchanNames [ i ] = fmt . Sprintf ( "%s_%dv%d" , t . core . chanTimeTick . getDmlChannelName ( ) , collID , i )
2021-06-19 14:18:08 +08:00
chanNames [ i ] = ToPhysicalChannel ( vchanNames [ i ] )
2021-11-03 21:04:14 +08:00
2021-11-25 10:07:15 +08:00
deltaChanNames [ i ] = t . core . chanTimeTick . getDeltaChannelName ( )
2021-12-23 18:39:11 +08:00
deltaChanName , err1 := ConvertChannelName ( chanNames [ i ] , Params . RootCoordCfg . DmlChannelName , Params . RootCoordCfg . DeltaChannelName )
2021-11-03 21:04:14 +08:00
if err1 != nil || deltaChanName != deltaChanNames [ i ] {
return fmt . Errorf ( "dmlChanName %s and deltaChanName %s mis-match" , chanNames [ i ] , deltaChanNames [ i ] )
}
2021-05-17 19:15:01 +08:00
}
2021-05-14 21:26:06 +08:00
collInfo := etcdpb . CollectionInfo {
2021-07-21 18:00:14 +08:00
ID : collID ,
Schema : & schema ,
PartitionIDs : [ ] typeutil . UniqueID { partID } ,
2022-01-10 19:03:35 +08:00
PartitionNames : [ ] string { Params . CommonCfg . DefaultPartitionName } ,
2021-07-21 18:00:14 +08:00
FieldIndexes : make ( [ ] * etcdpb . FieldIndexInfo , 0 , 16 ) ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-09-08 15:00:00 +08:00
ShardsNum : t . Req . ShardsNum ,
2021-07-23 14:36:12 +08:00
PartitionCreatedTimestamps : [ ] uint64 { 0 } ,
2021-12-21 19:49:02 +08:00
ConsistencyLevel : t . Req . ConsistencyLevel ,
2021-01-20 09:36:50 +08:00
}
2021-05-12 15:33:53 +08:00
2021-02-11 08:41:59 +08:00
idxInfo := make ( [ ] * etcdpb . IndexInfo , 0 , 16 )
2021-01-20 09:36:50 +08:00
2021-05-12 15:33:53 +08:00
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
2021-01-20 09:36:50 +08:00
schemaBytes , err := proto . Marshal ( & schema )
if err != nil {
2021-07-13 22:19:55 +08:00
return fmt . Errorf ( "marshal schema error = %w" , err )
2021-01-20 09:36:50 +08:00
}
2021-05-14 21:26:06 +08:00
ddCollReq := internalpb . CreateCollectionRequest {
2021-05-17 19:15:01 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
2022-01-10 19:03:35 +08:00
PartitionName : Params . CommonCfg . DefaultPartitionName ,
2021-05-17 19:15:01 +08:00
DbID : 0 , //TODO,not used
CollectionID : collID ,
2021-07-06 09:16:03 +08:00
PartitionID : partID ,
2021-05-17 19:15:01 +08:00
Schema : schemaBytes ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-01-20 09:36:50 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "create collection %d" , collID )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-12-10 10:19:10 +08:00
return fmt . Errorf ( "tso alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-05-12 15:33:53 +08:00
2021-10-13 15:54:33 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddCollReq . Base . Timestamp = ts
ddOpStr , err := EncodeDdOperation ( & ddCollReq , CreateCollectionDDType )
if err != nil {
2021-11-16 14:27:11 +08:00
return fmt . Errorf ( "encodeDdOperation fail, error = %w" , err )
2021-10-13 15:54:33 +08:00
}
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
createCollectionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
2021-06-04 15:00:34 +08:00
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
// clear ddl timetick in all conditions
2021-11-25 10:07:15 +08:00
defer t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
// add dml channel before send dd msg
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDmlChannels ( chanNames ... )
2021-05-20 14:14:14 +08:00
2021-11-03 21:04:14 +08:00
// also add delta channels
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDeltaChannels ( deltaChanNames ... )
2021-11-03 21:04:14 +08:00
2021-09-27 18:10:00 +08:00
ids , err := t . core . SendDdCreateCollectionReq ( ctx , & ddCollReq , chanNames )
2021-08-18 14:36:10 +08:00
if err != nil {
return fmt . Errorf ( "send dd create collection req failed, error = %w" , err )
}
2021-09-27 18:10:00 +08:00
for _ , pchan := range collInfo . PhysicalChannelNames {
collInfo . StartPositions = append ( collInfo . StartPositions , & commonpb . KeyDataPair {
Key : pchan ,
Data : ids [ pchan ] ,
} )
}
2021-12-15 08:59:08 +08:00
// update meta table after send dd operation
if err = t . core . MetaTable . AddCollection ( & collInfo , ts , idxInfo , ddOpStr ) ; err != nil {
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDmlChannels ( chanNames ... )
t . core . chanTimeTick . removeDeltaChannels ( deltaChanNames ... )
2021-09-27 18:10:00 +08:00
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
return fmt . Errorf ( "meta table add collection failed,error = %w" , err )
}
2021-08-18 14:36:10 +08:00
2021-12-15 08:59:08 +08:00
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2022-01-14 23:55:34 +08:00
errTimeTick := t . core . SendTimeTick ( ts , reason )
if errTimeTick != nil {
log . Warn ( "Failed to send timetick" , zap . Error ( errTimeTick ) )
}
2021-08-18 14:36:10 +08:00
return nil
}
2021-12-15 08:59:08 +08:00
if err = createCollectionFn ( ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
2021-07-13 22:19:55 +08:00
}
2021-08-18 14:36:10 +08:00
2021-12-15 08:59:08 +08:00
if err = t . core . CallWatchChannels ( ctx , collID , vchanNames ) ; err != nil {
2021-11-11 00:54:45 +08:00
return err
}
2021-08-18 14:36:10 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// DropCollectionReqTask drop collection request task
2021-01-19 14:44:03 +08:00
type DropCollectionReqTask struct {
baseReqTask
Req * milvuspb . DropCollectionRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * DropCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * DropCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropCollection {
return fmt . Errorf ( "drop collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-10-08 17:37:53 +08:00
if t . core . MetaTable . IsAlias ( t . Req . CollectionName ) {
return fmt . Errorf ( "cannot drop the collection via alias = %s" , t . Req . CollectionName )
}
2021-04-08 15:26:18 +08:00
2021-05-18 14:18:02 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . DropCollectionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
DbID : 0 , //not used
CollectionID : collMeta . ID ,
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "drop collection %d" , collMeta . ID )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
2021-10-13 15:54:33 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq . Base . Timestamp = ts
ddOpStr , err := EncodeDdOperation ( & ddReq , DropCollectionDDType )
if err != nil {
2021-11-16 14:27:11 +08:00
return fmt . Errorf ( "encodeDdOperation fail, error = %w" , err )
2021-10-13 15:54:33 +08:00
}
2021-11-23 23:01:15 +08:00
// drop all indices
if err = t . core . RemoveIndex ( ctx , t . Req . CollectionName , "" ) ; err != nil {
return err
}
2021-09-28 21:52:20 +08:00
2021-11-25 18:03:17 +08:00
// get all aliases before meta table updated
aliases := t . core . MetaTable . ListAliases ( collMeta . ID )
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
dropCollectionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
// clear ddl timetick in all conditions
2021-11-25 10:07:15 +08:00
defer t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
2021-12-15 08:59:08 +08:00
if err = t . core . SendDdDropCollectionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// update meta table after send dd operation
if err = t . core . MetaTable . DeleteCollection ( collMeta . ID , ts , ddOpStr ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2022-01-14 23:55:34 +08:00
errTimeTick := t . core . SendTimeTick ( ts , reason )
if errTimeTick != nil {
log . Warn ( "Failed to send timetick" , zap . Error ( errTimeTick ) )
}
2021-09-18 09:13:50 +08:00
// send tt into deleted channels to tell data_node to clear flowgragh
2022-01-15 18:53:34 +08:00
err := t . core . chanTimeTick . sendTimeTickToChannel ( collMeta . PhysicalChannelNames , ts )
if err != nil {
log . Warn ( "failed to send time tick to channel" , zap . Any ( "physical names" , collMeta . PhysicalChannelNames ) , zap . Error ( err ) )
}
2021-08-18 14:36:10 +08:00
// remove dml channel after send dd msg
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDmlChannels ( collMeta . PhysicalChannelNames ... )
2021-11-03 21:04:14 +08:00
// remove delta channels
deltaChanNames := make ( [ ] string , len ( collMeta . PhysicalChannelNames ) )
for i , chanName := range collMeta . PhysicalChannelNames {
2021-12-23 18:39:11 +08:00
if deltaChanNames [ i ] , err = ConvertChannelName ( chanName , Params . RootCoordCfg . DmlChannelName , Params . RootCoordCfg . DeltaChannelName ) ; err != nil {
2021-11-03 21:04:14 +08:00
return err
}
}
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDeltaChannels ( deltaChanNames ... )
2021-08-18 14:36:10 +08:00
return nil
2021-05-14 21:26:06 +08:00
}
2021-12-15 08:59:08 +08:00
if err = dropCollectionFn ( ) ; err != nil {
2021-01-19 14:44:03 +08:00
return err
}
2021-02-05 14:09:55 +08:00
//notify query service to release collection
2021-08-03 21:47:24 +08:00
if err = t . core . CallReleaseCollectionService ( t . core . ctx , ts , 0 , collMeta . ID ) ; err != nil {
2021-11-23 23:01:15 +08:00
log . Error ( "Failed to CallReleaseCollectionService" , zap . Error ( err ) )
2021-08-03 21:47:24 +08:00
return err
}
2021-02-05 14:09:55 +08:00
2021-11-23 23:01:15 +08:00
t . core . ExpireMetaCache ( ctx , [ ] string { t . Req . CollectionName } , ts )
t . core . ExpireMetaCache ( ctx , aliases , ts )
2021-09-28 21:52:20 +08:00
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// HasCollectionReqTask has collection request task
2021-01-19 14:44:03 +08:00
type HasCollectionReqTask struct {
baseReqTask
Req * milvuspb . HasCollectionRequest
HasCollection bool
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * HasCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * HasCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasCollection {
return fmt . Errorf ( "has collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 17:12:17 +08:00
_ , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-01-19 14:44:03 +08:00
if err == nil {
t . HasCollection = true
} else {
t . HasCollection = false
}
return nil
}
2021-09-23 15:10:00 +08:00
// DescribeCollectionReqTask describe collection request task
2021-01-19 14:44:03 +08:00
type DescribeCollectionReqTask struct {
baseReqTask
Req * milvuspb . DescribeCollectionRequest
Rsp * milvuspb . DescribeCollectionResponse
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * DescribeCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * DescribeCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DescribeCollection {
return fmt . Errorf ( "describe collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-17 19:15:01 +08:00
var collInfo * etcdpb . CollectionInfo
2021-02-25 16:08:56 +08:00
var err error
if t . Req . CollectionName != "" {
2021-05-18 17:12:17 +08:00
collInfo , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-02-25 16:08:56 +08:00
if err != nil {
return err
}
} else {
2021-05-18 17:12:17 +08:00
collInfo , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , t . Req . TimeStamp )
2021-02-25 16:08:56 +08:00
if err != nil {
return err
}
2021-01-19 14:44:03 +08:00
}
2021-02-25 16:08:56 +08:00
2021-05-17 19:15:01 +08:00
t . Rsp . Schema = proto . Clone ( collInfo . Schema ) . ( * schemapb . CollectionSchema )
t . Rsp . CollectionID = collInfo . ID
t . Rsp . VirtualChannelNames = collInfo . VirtualChannelNames
t . Rsp . PhysicalChannelNames = collInfo . PhysicalChannelNames
2021-09-08 15:00:00 +08:00
if collInfo . ShardsNum == 0 {
collInfo . ShardsNum = int32 ( len ( collInfo . VirtualChannelNames ) )
}
t . Rsp . ShardsNum = collInfo . ShardsNum
2021-12-21 19:49:02 +08:00
t . Rsp . ConsistencyLevel = collInfo . ConsistencyLevel
2021-07-21 18:00:14 +08:00
t . Rsp . CreatedTimestamp = collInfo . CreateTime
createdPhysicalTime , _ := tsoutil . ParseHybridTs ( collInfo . CreateTime )
2021-10-27 16:30:28 +08:00
t . Rsp . CreatedUtcTimestamp = uint64 ( createdPhysicalTime )
2021-09-22 16:20:48 +08:00
t . Rsp . Aliases = t . core . MetaTable . ListAliases ( collInfo . ID )
2021-09-27 18:10:00 +08:00
t . Rsp . StartPositions = collInfo . GetStartPositions ( )
2021-01-19 14:44:03 +08:00
return nil
}
2021-09-23 15:10:00 +08:00
// ShowCollectionReqTask show collection request task
2021-01-19 14:44:03 +08:00
type ShowCollectionReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowCollectionsRequest
Rsp * milvuspb . ShowCollectionsResponse
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * ShowCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * ShowCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowCollections {
return fmt . Errorf ( "show collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 17:12:17 +08:00
coll , err := t . core . MetaTable . ListCollections ( t . Req . TimeStamp )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-21 18:00:14 +08:00
for name , meta := range coll {
2021-06-03 19:09:33 +08:00
t . Rsp . CollectionNames = append ( t . Rsp . CollectionNames , name )
2021-07-21 18:00:14 +08:00
t . Rsp . CollectionIds = append ( t . Rsp . CollectionIds , meta . ID )
t . Rsp . CreatedTimestamps = append ( t . Rsp . CreatedTimestamps , meta . CreateTime )
physical , _ := tsoutil . ParseHybridTs ( meta . CreateTime )
2021-10-27 16:30:28 +08:00
t . Rsp . CreatedUtcTimestamps = append ( t . Rsp . CreatedUtcTimestamps , uint64 ( physical ) )
2021-06-03 19:09:33 +08:00
}
2021-01-19 14:44:03 +08:00
return nil
}
2021-09-23 15:10:00 +08:00
// CreatePartitionReqTask create partition request task
2021-01-19 14:44:03 +08:00
type CreatePartitionReqTask struct {
baseReqTask
Req * milvuspb . CreatePartitionRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * CreatePartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * CreatePartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreatePartition {
return fmt . Errorf ( "create partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-05-20 14:14:14 +08:00
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . CreatePartitionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , // todo, not used
CollectionID : collMeta . ID ,
2021-05-14 21:26:06 +08:00
PartitionID : partID ,
2021-01-20 09:36:50 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "create partition %s" , t . Req . PartitionName )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-03-22 16:36:10 +08:00
2021-10-13 15:54:33 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq . Base . Timestamp = ts
ddOpStr , err := EncodeDdOperation ( & ddReq , CreatePartitionDDType )
if err != nil {
2021-11-16 14:27:11 +08:00
return fmt . Errorf ( "encodeDdOperation fail, error = %w" , err )
2021-10-13 15:54:33 +08:00
}
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
createPartitionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
// clear ddl timetick in all conditions
2021-11-25 10:07:15 +08:00
defer t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
2021-12-15 08:59:08 +08:00
if err = t . core . SendDdCreatePartitionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// update meta table after send dd operation
if err = t . core . MetaTable . AddPartition ( collMeta . ID , t . Req . PartitionName , partID , ts , ddOpStr ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2022-01-14 23:55:34 +08:00
errTimeTick := t . core . SendTimeTick ( ts , reason )
if errTimeTick != nil {
log . Warn ( "Failed to send timetick" , zap . Error ( errTimeTick ) )
}
2021-08-18 14:36:10 +08:00
return nil
}
2021-12-15 08:59:08 +08:00
if err = createPartitionFn ( ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-05-12 15:33:53 +08:00
2021-11-23 23:01:15 +08:00
t . core . ExpireMetaCache ( ctx , [ ] string { t . Req . CollectionName } , ts )
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// DropPartitionReqTask drop partition request task
2021-01-19 14:44:03 +08:00
type DropPartitionReqTask struct {
baseReqTask
Req * milvuspb . DropPartitionRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * DropPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * DropPartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropPartition {
return fmt . Errorf ( "drop partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
collInfo , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-03 14:36:18 +08:00
partID , err := t . core . MetaTable . GetPartitionByName ( collInfo . ID , t . Req . PartitionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-03-12 14:22:09 +08:00
ddReq := internalpb . DropPartitionRequest {
2021-01-20 09:36:50 +08:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , //todo,not used
2021-05-14 21:26:06 +08:00
CollectionID : collInfo . ID ,
2021-07-03 14:36:18 +08:00
PartitionID : partID ,
2021-01-20 09:36:50 +08:00
}
2021-08-18 14:36:10 +08:00
reason := fmt . Sprintf ( "drop partition %s" , t . Req . PartitionName )
ts , err := t . core . TSOAllocator ( 1 )
2021-05-14 21:26:06 +08:00
if err != nil {
2021-08-18 14:36:10 +08:00
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
2021-05-14 21:26:06 +08:00
}
2021-05-12 15:33:53 +08:00
2021-10-13 15:54:33 +08:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
ddReq . Base . Timestamp = ts
ddOpStr , err := EncodeDdOperation ( & ddReq , DropPartitionDDType )
if err != nil {
2021-11-16 14:27:11 +08:00
return fmt . Errorf ( "encodeDdOperation fail, error = %w" , err )
2021-10-13 15:54:33 +08:00
}
2021-08-18 14:36:10 +08:00
// use lambda function here to guarantee all resources to be released
dropPartitionFn := func ( ) error {
// lock for ddl operation
t . core . ddlLock . Lock ( )
defer t . core . ddlLock . Unlock ( )
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . addDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
// clear ddl timetick in all conditions
2021-11-25 10:07:15 +08:00
defer t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2021-08-18 14:36:10 +08:00
2021-12-15 08:59:08 +08:00
if err = t . core . SendDdDropPartitionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// update meta table after send dd operation
if _ , err = t . core . MetaTable . DeletePartition ( collInfo . ID , t . Req . PartitionName , ts , ddOpStr ) ; err != nil {
2021-08-18 14:36:10 +08:00
return err
}
2021-12-15 08:59:08 +08:00
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
2021-11-25 10:07:15 +08:00
t . core . chanTimeTick . removeDdlTimeTick ( ts , reason )
2022-01-14 23:55:34 +08:00
errTimeTick := t . core . SendTimeTick ( ts , reason )
if errTimeTick != nil {
log . Warn ( "Failed to send timetick" , zap . Error ( errTimeTick ) )
}
2021-08-18 14:36:10 +08:00
return nil
}
2021-12-15 08:59:08 +08:00
if err = dropPartitionFn ( ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-05-12 15:33:53 +08:00
2021-11-23 23:01:15 +08:00
t . core . ExpireMetaCache ( ctx , [ ] string { t . Req . CollectionName } , ts )
2021-05-14 21:26:06 +08:00
2021-06-22 16:08:08 +08:00
//notify query service to release partition
2021-12-15 22:11:09 +08:00
// TODO::xige-16, reOpen when queryCoord support release partitions after load collection
//if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil {
// log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
// return err
//}
2021-06-22 16:08:08 +08:00
2021-05-14 21:26:06 +08:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// HasPartitionReqTask has partition request task
2021-01-19 14:44:03 +08:00
type HasPartitionReqTask struct {
baseReqTask
Req * milvuspb . HasPartitionRequest
HasPartition bool
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * HasPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * HasPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasPartition {
return fmt . Errorf ( "has partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-05-18 14:18:02 +08:00
t . HasPartition = t . core . MetaTable . HasPartition ( coll . ID , t . Req . PartitionName , 0 )
2021-01-19 14:44:03 +08:00
return nil
}
2021-09-23 15:10:00 +08:00
// ShowPartitionReqTask show partition request task
2021-01-19 14:44:03 +08:00
type ShowPartitionReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowPartitionsRequest
Rsp * milvuspb . ShowPartitionsResponse
2021-01-19 14:44:03 +08:00
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-19 14:44:03 +08:00
func ( t * ShowPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * ShowPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowPartitions {
return fmt . Errorf ( "show partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-02-05 16:32:03 +08:00
var coll * etcdpb . CollectionInfo
var err error
if t . Req . CollectionName == "" {
2021-05-18 14:18:02 +08:00
coll , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-02-05 16:32:03 +08:00
} else {
2021-05-18 14:18:02 +08:00
coll , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-02-05 16:32:03 +08:00
}
2021-01-19 14:44:03 +08:00
if err != nil {
return err
}
2021-07-03 14:36:18 +08:00
t . Rsp . PartitionIDs = coll . PartitionIDs
2021-07-06 09:16:03 +08:00
t . Rsp . PartitionNames = coll . PartitionNames
2021-07-21 18:00:14 +08:00
t . Rsp . CreatedTimestamps = coll . PartitionCreatedTimestamps
t . Rsp . CreatedUtcTimestamps = make ( [ ] uint64 , 0 , len ( coll . PartitionCreatedTimestamps ) )
for _ , ts := range coll . PartitionCreatedTimestamps {
physical , _ := tsoutil . ParseHybridTs ( ts )
2021-10-27 16:30:28 +08:00
t . Rsp . CreatedUtcTimestamps = append ( t . Rsp . CreatedUtcTimestamps , uint64 ( physical ) )
2021-07-21 18:00:14 +08:00
}
2021-07-03 14:36:18 +08:00
2021-01-19 14:44:03 +08:00
return nil
}
2021-01-21 10:01:29 +08:00
2021-09-23 15:10:00 +08:00
// DescribeSegmentReqTask describe segment request task
2021-01-21 10:01:29 +08:00
type DescribeSegmentReqTask struct {
baseReqTask
Req * milvuspb . DescribeSegmentRequest
Rsp * milvuspb . DescribeSegmentResponse //TODO,return repeated segment id in the future
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-21 10:01:29 +08:00
func ( t * DescribeSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * DescribeSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeSegment {
return fmt . Errorf ( "describe segment, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
exist := false
2021-07-03 14:36:18 +08:00
segIDs , err := t . core . CallGetFlushedSegmentsService ( ctx , t . Req . CollectionID , - 1 )
if err != nil {
2021-10-22 14:10:06 +08:00
log . Debug ( "Get flushed segment from data coord failed" , zap . String ( "collection_name" , coll . Schema . Name ) , zap . Error ( err ) )
2021-07-03 14:36:18 +08:00
exist = true
} else {
for _ , id := range segIDs {
if id == t . Req . SegmentID {
2021-01-21 10:01:29 +08:00
exist = true
break
}
}
}
2021-07-03 14:36:18 +08:00
2021-01-21 10:01:29 +08:00
if ! exist {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "segment id %d not belong to collection id %d" , t . Req . SegmentID , t . Req . CollectionID )
2021-01-21 10:01:29 +08:00
}
//TODO, get filed_id and index_name from request
segIdxInfo , err := t . core . MetaTable . GetSegmentIndexInfoByID ( t . Req . SegmentID , - 1 , "" )
2021-06-23 15:28:09 +08:00
log . Debug ( "RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID" , zap . Any ( "SegmentID" , t . Req . SegmentID ) ,
2021-06-06 09:41:35 +08:00
zap . Any ( "segIdxInfo" , segIdxInfo ) , zap . Error ( err ) )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
t . Rsp . IndexID = segIdxInfo . IndexID
2021-01-28 17:25:43 +08:00
t . Rsp . BuildID = segIdxInfo . BuildID
2021-03-08 15:46:51 +08:00
t . Rsp . EnableIndex = segIdxInfo . EnableIndex
2021-12-03 11:29:33 +08:00
t . Rsp . FieldID = segIdxInfo . FieldID
2021-01-21 10:01:29 +08:00
return nil
}
2021-09-23 15:10:00 +08:00
// ShowSegmentReqTask show segment request task
2021-01-21 10:01:29 +08:00
type ShowSegmentReqTask struct {
baseReqTask
2021-03-12 14:22:09 +08:00
Req * milvuspb . ShowSegmentsRequest
Rsp * milvuspb . ShowSegmentsResponse
2021-01-21 10:01:29 +08:00
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-21 10:01:29 +08:00
func ( t * ShowSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * ShowSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowSegments {
return fmt . Errorf ( "show segments, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 14:18:02 +08:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
2021-02-22 16:40:26 +08:00
exist := false
2021-01-21 10:01:29 +08:00
for _ , partID := range coll . PartitionIDs {
2021-02-22 16:40:26 +08:00
if partID == t . Req . PartitionID {
exist = true
break
2021-01-21 10:01:29 +08:00
}
}
2021-02-22 16:40:26 +08:00
if ! exist {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "partition id = %d not belong to collection id = %d" , t . Req . PartitionID , t . Req . CollectionID )
2021-02-22 16:40:26 +08:00
}
2021-07-03 14:36:18 +08:00
segIDs , err := t . core . CallGetFlushedSegmentsService ( ctx , t . Req . CollectionID , t . Req . PartitionID )
2021-02-22 16:40:26 +08:00
if err != nil {
2021-10-22 14:10:06 +08:00
log . Debug ( "Get flushed segments from data coord failed" , zap . String ( "collection name" , coll . Schema . Name ) , zap . Int64 ( "partition id" , t . Req . PartitionID ) , zap . Error ( err ) )
2021-02-22 16:40:26 +08:00
return err
}
2021-07-03 14:36:18 +08:00
t . Rsp . SegmentIDs = append ( t . Rsp . SegmentIDs , segIDs ... )
2021-01-21 10:01:29 +08:00
return nil
}
2021-09-23 15:10:00 +08:00
// CreateIndexReqTask create index request task
2021-01-21 10:01:29 +08:00
type CreateIndexReqTask struct {
baseReqTask
Req * milvuspb . CreateIndexRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-21 10:01:29 +08:00
func ( t * CreateIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * CreateIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_CreateIndex {
return fmt . Errorf ( "create index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2022-01-10 19:03:35 +08:00
indexName := Params . CommonCfg . DefaultIndexName //TODO, get name from request
2021-05-20 14:14:14 +08:00
indexID , _ , err := t . core . IDAllocator ( 1 )
2021-06-23 15:28:09 +08:00
log . Debug ( "RootCoord CreateIndexReqTask" , zap . Any ( "indexID" , indexID ) , zap . Error ( err ) )
2021-02-09 13:11:55 +08:00
if err != nil {
return err
}
idxInfo := & etcdpb . IndexInfo {
IndexName : indexName ,
IndexID : indexID ,
IndexParams : t . Req . ExtraParams ,
}
2021-07-03 14:36:18 +08:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
if err != nil {
return err
}
segID2PartID , err := t . core . getSegments ( ctx , collMeta . ID )
flushedSegs := make ( [ ] typeutil . UniqueID , 0 , len ( segID2PartID ) )
for k := range segID2PartID {
flushedSegs = append ( flushedSegs , k )
}
2021-01-21 10:01:29 +08:00
if err != nil {
2021-10-22 14:10:06 +08:00
log . Debug ( "Get flushed segments from data coord failed" , zap . String ( "collection_name" , collMeta . Schema . Name ) , zap . Error ( err ) )
2021-07-03 14:36:18 +08:00
return err
}
2021-10-21 14:04:36 +08:00
segIDs , field , err := t . core . MetaTable . GetNotIndexedSegments ( t . Req . CollectionName , t . Req . FieldName , idxInfo , flushedSegs )
2021-07-03 14:36:18 +08:00
if err != nil {
log . Debug ( "RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments" , zap . Error ( err ) )
2021-01-21 10:01:29 +08:00
return err
}
2021-03-12 14:22:09 +08:00
if field . DataType != schemapb . DataType_FloatVector && field . DataType != schemapb . DataType_BinaryVector {
2021-03-05 10:15:27 +08:00
return fmt . Errorf ( "field name = %s, data type = %s" , t . Req . FieldName , schemapb . DataType_name [ int32 ( field . DataType ) ] )
2021-01-22 15:41:54 +08:00
}
2021-05-24 14:19:52 +08:00
2021-05-15 18:08:08 +08:00
for _ , segID := range segIDs {
2021-05-24 14:19:52 +08:00
info := etcdpb . SegmentIndexInfo {
2021-07-03 20:00:40 +08:00
CollectionID : collMeta . ID ,
PartitionID : segID2PartID [ segID ] ,
SegmentID : segID ,
FieldID : field . FieldID ,
IndexID : idxInfo . IndexID ,
EnableIndex : false ,
2021-05-24 14:19:52 +08:00
}
2021-06-30 16:18:13 +08:00
info . BuildID , err = t . core . BuildIndex ( ctx , segID , & field , idxInfo , false )
2021-05-24 14:19:52 +08:00
if err != nil {
2021-05-15 18:08:08 +08:00
return err
2021-01-21 10:01:29 +08:00
}
2021-06-06 09:41:35 +08:00
if info . BuildID != 0 {
info . EnableIndex = true
}
2021-10-21 14:04:36 +08:00
if err := t . core . MetaTable . AddIndex ( & info ) ; err != nil {
2021-07-03 14:36:18 +08:00
log . Debug ( "Add index into meta table failed" , zap . Int64 ( "collection_id" , collMeta . ID ) , zap . Int64 ( "index_id" , info . IndexID ) , zap . Int64 ( "build_id" , info . BuildID ) , zap . Error ( err ) )
}
2021-01-21 10:01:29 +08:00
}
2021-05-24 14:19:52 +08:00
2021-07-03 14:36:18 +08:00
return nil
2021-01-21 10:01:29 +08:00
}
2021-09-23 15:10:00 +08:00
// DescribeIndexReqTask describe index request task
2021-01-21 10:01:29 +08:00
type DescribeIndexReqTask struct {
baseReqTask
Req * milvuspb . DescribeIndexRequest
Rsp * milvuspb . DescribeIndexResponse
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-01-21 10:01:29 +08:00
func ( t * DescribeIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-04-08 15:26:18 +08:00
func ( t * DescribeIndexReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeIndex {
return fmt . Errorf ( "describe index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-04-27 10:30:55 +08:00
coll , idx , err := t . core . MetaTable . GetIndexByName ( t . Req . CollectionName , t . Req . IndexName )
2021-01-21 10:01:29 +08:00
if err != nil {
return err
}
for _ , i := range idx {
2021-04-27 10:30:55 +08:00
f , err := GetFieldSchemaByIndexID ( & coll , typeutil . UniqueID ( i . IndexID ) )
if err != nil {
2021-10-22 14:10:06 +08:00
log . Warn ( "Get field schema by index id failed" , zap . String ( "collection name" , t . Req . CollectionName ) , zap . String ( "index name" , t . Req . IndexName ) , zap . Error ( err ) )
2021-04-27 10:30:55 +08:00
continue
}
2021-01-21 10:01:29 +08:00
desc := & milvuspb . IndexDescription {
IndexName : i . IndexName ,
Params : i . IndexParams ,
2021-02-08 14:20:29 +08:00
IndexID : i . IndexID ,
2021-04-27 10:30:55 +08:00
FieldName : f . Name ,
2021-01-21 10:01:29 +08:00
}
t . Rsp . IndexDescriptions = append ( t . Rsp . IndexDescriptions , desc )
}
return nil
}
2021-09-23 15:10:00 +08:00
// DropIndexReqTask drop index request task
2021-02-20 15:38:44 +08:00
type DropIndexReqTask struct {
baseReqTask
Req * milvuspb . DropIndexRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-02-20 15:38:44 +08:00
func ( t * DropIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-03-13 14:42:53 +08:00
func ( t * DropIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 15:26:18 +08:00
if t . Type ( ) != commonpb . MsgType_DropIndex {
return fmt . Errorf ( "drop index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-11-23 23:01:15 +08:00
if err := t . core . RemoveIndex ( ctx , t . Req . CollectionName , t . Req . IndexName ) ; err != nil {
2021-03-04 14:54:16 +08:00
return err
}
2021-11-23 23:01:15 +08:00
_ , _ , err := t . core . MetaTable . DropIndex ( t . Req . CollectionName , t . Req . FieldName , t . Req . IndexName )
2021-03-04 14:54:16 +08:00
return err
2021-02-20 15:38:44 +08:00
}
2021-09-18 11:13:51 +08:00
2021-09-23 15:10:00 +08:00
// CreateAliasReqTask create alias request task
2021-09-18 11:13:51 +08:00
type CreateAliasReqTask struct {
baseReqTask
Req * milvuspb . CreateAliasRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-09-18 11:13:51 +08:00
func ( t * CreateAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-09-18 11:13:51 +08:00
func ( t * CreateAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_CreateAlias {
return fmt . Errorf ( "create alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
2021-10-12 17:40:34 +08:00
err = t . core . MetaTable . AddAlias ( t . Req . Alias , t . Req . CollectionName , ts )
2021-09-18 11:13:51 +08:00
if err != nil {
2021-10-12 17:40:34 +08:00
return fmt . Errorf ( "meta table add alias failed, error = %w" , err )
2021-09-18 11:13:51 +08:00
}
2021-10-12 17:40:34 +08:00
return nil
2021-09-18 11:13:51 +08:00
}
2021-09-23 15:10:00 +08:00
// DropAliasReqTask drop alias request task
2021-09-18 11:13:51 +08:00
type DropAliasReqTask struct {
baseReqTask
Req * milvuspb . DropAliasRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-09-18 11:13:51 +08:00
func ( t * DropAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-09-18 11:13:51 +08:00
func ( t * DropAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DropAlias {
return fmt . Errorf ( "create alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
2021-10-12 17:40:34 +08:00
err = t . core . MetaTable . DropAlias ( t . Req . Alias , ts )
2021-09-18 11:13:51 +08:00
if err != nil {
2021-10-12 17:40:34 +08:00
return fmt . Errorf ( "meta table drop alias failed, error = %w" , err )
2021-09-18 11:13:51 +08:00
}
2021-11-23 23:01:15 +08:00
t . core . ExpireMetaCache ( ctx , [ ] string { t . Req . Alias } , ts )
2021-09-18 11:13:51 +08:00
2021-10-12 17:40:34 +08:00
return nil
2021-09-18 11:13:51 +08:00
}
2021-09-23 15:10:00 +08:00
// AlterAliasReqTask alter alias request task
2021-09-18 11:13:51 +08:00
type AlterAliasReqTask struct {
baseReqTask
Req * milvuspb . AlterAliasRequest
}
2021-09-23 15:10:00 +08:00
// Type return msg type
2021-09-18 11:13:51 +08:00
func ( t * AlterAliasReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-09-23 15:10:00 +08:00
// Execute task execution
2021-09-18 11:13:51 +08:00
func ( t * AlterAliasReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_AlterAlias {
return fmt . Errorf ( "alter alias, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
ts , err := t . core . TSOAllocator ( 1 )
if err != nil {
return fmt . Errorf ( "TSO alloc fail, error = %w" , err )
}
2021-10-12 17:40:34 +08:00
err = t . core . MetaTable . AlterAlias ( t . Req . Alias , t . Req . CollectionName , ts )
2021-09-18 11:13:51 +08:00
if err != nil {
2021-10-12 17:40:34 +08:00
return fmt . Errorf ( "meta table alter alias failed, error = %w" , err )
2021-09-18 11:13:51 +08:00
}
2021-11-23 23:01:15 +08:00
t . core . ExpireMetaCache ( ctx , [ ] string { t . Req . Alias } , ts )
2021-09-18 11:13:51 +08:00
2021-10-12 17:40:34 +08:00
return nil
2021-09-18 11:13:51 +08:00
}