2021-04-19 13:47:10 +08:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-01-16 10:12:14 +08:00
package querynode
2020-08-25 15:45:19 +08:00
2020-09-01 16:23:39 +08:00
/ *
2020-10-23 18:01:24 +08:00
# cgo CFLAGS : - I $ { SRCDIR } / . . / core / output / include
2020-09-01 16:23:39 +08:00
2020-10-31 15:11:47 +08:00
# cgo LDFLAGS : - L $ { SRCDIR } / . . / core / output / lib - lmilvus_segcore - Wl , - rpath = $ { SRCDIR } / . . / core / output / lib
2020-09-01 16:23:39 +08:00
2020-11-25 10:31:51 +08:00
# include "segcore/collection_c.h"
# include "segcore/segment_c.h"
2020-09-01 16:23:39 +08:00
* /
2020-08-25 15:45:19 +08:00
import "C"
2021-01-18 10:38:41 +08:00
import (
2021-06-19 18:38:07 +08:00
"errors"
"fmt"
2021-06-15 12:41:40 +08:00
"math"
"sync"
2021-06-09 11:37:55 +08:00
"unsafe"
"go.uber.org/zap"
2021-01-18 10:38:41 +08:00
"github.com/golang/protobuf/proto"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-01-18 10:38:41 +08:00
)
2020-08-25 15:45:19 +08:00
type Collection struct {
2021-06-15 20:06:10 +08:00
collectionPtr C . CCollection
id UniqueID
partitionIDs [ ] UniqueID
schema * schemapb . CollectionSchema
vChannels [ ] Channel
pChannels [ ] Channel
2021-06-15 12:41:40 +08:00
2021-06-19 18:38:07 +08:00
loadType loadType
releaseMu sync . RWMutex // guards release
releasedPartitions map [ UniqueID ] struct { }
releaseTime Timestamp
2020-08-25 15:45:19 +08:00
}
2020-11-09 16:27:11 +08:00
func ( c * Collection ) ID ( ) UniqueID {
2020-12-10 16:31:09 +08:00
return c . id
2020-11-09 16:27:11 +08:00
}
2020-08-25 15:45:19 +08:00
2021-01-18 10:38:41 +08:00
func ( c * Collection ) Schema ( ) * schemapb . CollectionSchema {
return c . schema
}
2021-02-05 10:53:11 +08:00
func ( c * Collection ) addPartitionID ( partitionID UniqueID ) {
2021-06-26 16:08:11 +08:00
c . releaseMu . Lock ( )
defer c . releaseMu . Unlock ( )
log . Debug ( "queryNode collection add a partition" , zap . Int64 ( "collection" , c . id ) , zap . Int64 ( "partitionID" , partitionID ) )
2021-02-05 10:53:11 +08:00
c . partitionIDs = append ( c . partitionIDs , partitionID )
2021-06-26 16:08:11 +08:00
log . Debug ( "queryNode collection info after add a partition" , zap . Int64 ( "collectionID" , c . id ) , zap . Int64s ( "partitions" , c . partitionIDs ) , zap . Any ( "releasePartitions" , c . releasedPartitions ) )
2021-02-05 10:53:11 +08:00
}
func ( c * Collection ) removePartitionID ( partitionID UniqueID ) {
tmpIDs := make ( [ ] UniqueID , 0 )
for _ , id := range c . partitionIDs {
2021-04-29 15:59:08 +08:00
if id != partitionID {
2021-02-05 10:53:11 +08:00
tmpIDs = append ( tmpIDs , id )
}
}
c . partitionIDs = tmpIDs
}
2021-06-15 20:06:10 +08:00
func ( c * Collection ) addVChannels ( channels [ ] Channel ) {
2021-09-07 15:45:59 +08:00
OUTER :
for _ , dstChan := range channels {
for _ , srcChan := range c . vChannels {
if dstChan == srcChan {
log . Debug ( "vChannel has been existed in collection's vChannels" ,
zap . Any ( "collectionID" , c . ID ( ) ) ,
zap . Any ( "vChannel" , dstChan ) ,
)
continue OUTER
}
}
log . Debug ( "add vChannel to collection" ,
zap . Any ( "collectionID" , c . ID ( ) ) ,
zap . Any ( "vChannel" , dstChan ) ,
)
c . vChannels = append ( c . vChannels , dstChan )
}
2021-05-28 15:40:32 +08:00
}
2021-06-15 20:06:10 +08:00
func ( c * Collection ) getVChannels ( ) [ ] Channel {
return c . vChannels
}
func ( c * Collection ) addPChannels ( channels [ ] Channel ) {
2021-09-07 15:45:59 +08:00
OUTER :
for _ , dstChan := range channels {
for _ , srcChan := range c . pChannels {
if dstChan == srcChan {
log . Debug ( "pChannel has been existed in collection's pChannels" ,
zap . Any ( "collectionID" , c . ID ( ) ) ,
zap . Any ( "pChannel" , dstChan ) ,
)
continue OUTER
}
}
log . Debug ( "add pChannel to collection" ,
zap . Any ( "collectionID" , c . ID ( ) ) ,
zap . Any ( "pChannel" , dstChan ) ,
)
c . pChannels = append ( c . pChannels , dstChan )
}
2021-06-15 20:06:10 +08:00
}
func ( c * Collection ) getPChannels ( ) [ ] Channel {
return c . pChannels
2021-05-28 15:40:32 +08:00
}
2021-06-15 12:41:40 +08:00
func ( c * Collection ) setReleaseTime ( t Timestamp ) {
c . releaseMu . Lock ( )
defer c . releaseMu . Unlock ( )
c . releaseTime = t
}
func ( c * Collection ) getReleaseTime ( ) Timestamp {
c . releaseMu . RLock ( )
defer c . releaseMu . RUnlock ( )
return c . releaseTime
}
2021-06-19 18:38:07 +08:00
func ( c * Collection ) addReleasedPartition ( partitionID UniqueID ) {
c . releaseMu . Lock ( )
defer c . releaseMu . Unlock ( )
2021-06-26 16:08:11 +08:00
log . Debug ( "queryNode collection release a partition" , zap . Int64 ( "collectionID" , c . id ) , zap . Int64 ( "partition" , partitionID ) )
2021-06-19 18:38:07 +08:00
c . releasedPartitions [ partitionID ] = struct { } { }
2021-06-26 16:08:11 +08:00
partitions := make ( [ ] UniqueID , 0 )
for _ , id := range c . partitionIDs {
if id != partitionID {
partitions = append ( partitions , id )
}
}
c . partitionIDs = partitions
log . Debug ( "queryNode collection info after release a partition" , zap . Int64 ( "collectionID" , c . id ) , zap . Int64s ( "partitions" , c . partitionIDs ) , zap . Any ( "releasePartitions" , c . releasedPartitions ) )
}
func ( c * Collection ) deleteReleasedPartition ( partitionID UniqueID ) {
c . releaseMu . Lock ( )
defer c . releaseMu . Unlock ( )
log . Debug ( "queryNode collection reload a released partition" , zap . Int64 ( "collectionID" , c . id ) , zap . Int64 ( "partition" , partitionID ) )
delete ( c . releasedPartitions , partitionID )
log . Debug ( "queryNode collection info after reload a released partition" , zap . Int64 ( "collectionID" , c . id ) , zap . Int64s ( "partitions" , c . partitionIDs ) , zap . Any ( "releasePartitions" , c . releasedPartitions ) )
2021-06-19 18:38:07 +08:00
}
func ( c * Collection ) checkReleasedPartitions ( partitionIDs [ ] UniqueID ) error {
c . releaseMu . RLock ( )
defer c . releaseMu . RUnlock ( )
for _ , id := range partitionIDs {
if _ , ok := c . releasedPartitions [ id ] ; ok {
return errors . New ( "partition has been released" +
", collectionID = " + fmt . Sprintln ( c . ID ( ) ) +
", partitionID = " + fmt . Sprintln ( id ) )
}
}
return nil
}
func ( c * Collection ) setLoadType ( l loadType ) {
c . loadType = l
}
func ( c * Collection ) getLoadType ( ) loadType {
return c . loadType
}
2021-01-18 10:38:41 +08:00
func newCollection ( collectionID UniqueID , schema * schemapb . CollectionSchema ) * Collection {
2020-09-21 18:16:06 +08:00
/ *
2020-11-09 16:27:11 +08:00
CCollection
2020-12-10 16:31:09 +08:00
NewCollection ( const char * schema_proto_blob ) ;
2020-10-24 10:45:57 +08:00
* /
2021-01-18 10:38:41 +08:00
schemaBlob := proto . MarshalTextString ( schema )
2020-12-10 16:31:09 +08:00
cSchemaBlob := C . CString ( schemaBlob )
collection := C . NewCollection ( cSchemaBlob )
var newCollection = & Collection {
2021-06-19 18:38:07 +08:00
collectionPtr : collection ,
id : collectionID ,
schema : schema ,
vChannels : make ( [ ] Channel , 0 ) ,
pChannels : make ( [ ] Channel , 0 ) ,
releasedPartitions : make ( map [ UniqueID ] struct { } ) ,
2020-12-10 16:31:09 +08:00
}
2021-03-26 18:40:04 +08:00
C . free ( unsafe . Pointer ( cSchemaBlob ) )
2020-10-24 10:45:57 +08:00
2021-03-05 09:21:35 +08:00
log . Debug ( "create collection" , zap . Int64 ( "collectionID" , collectionID ) )
2021-06-15 12:41:40 +08:00
newCollection . setReleaseTime ( Timestamp ( math . MaxUint64 ) )
2020-11-09 16:27:11 +08:00
return newCollection
2020-08-25 15:45:19 +08:00
}
2020-11-05 10:52:50 +08:00
2020-11-09 16:27:11 +08:00
func deleteCollection ( collection * Collection ) {
/ *
void
deleteCollection ( CCollection collection ) ;
* /
cPtr := collection . collectionPtr
C . DeleteCollection ( cPtr )
2021-03-05 09:21:35 +08:00
collection . collectionPtr = nil
log . Debug ( "delete collection" , zap . Int64 ( "collectionID" , collection . ID ( ) ) )
collection = nil
2020-11-05 10:52:50 +08:00
}