mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Add segment seeking and use real client
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
7c0c835b49
commit
7554246ace
93
cmd/masterservice/main.go
Normal file
93
cmd/masterservice/main.go
Normal file
@ -0,0 +1,93 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
|
||||
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
|
||||
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
|
||||
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
|
||||
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
|
||||
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
|
||||
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
)
|
||||
|
||||
const reTryCnt = 3
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
|
||||
|
||||
svr, err := msc.NewGrpcServer(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Printf("proxy service address : %s", psc.Params.NetworkAddress())
|
||||
//proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress())
|
||||
|
||||
//TODO, test proxy service GetComponentStates, before set
|
||||
|
||||
//if err = svr.SetProxyService(proxyService); err != nil {
|
||||
// panic(err)
|
||||
//}
|
||||
|
||||
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
|
||||
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
|
||||
if err = dataService.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err = dataService.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cnt := 0
|
||||
for cnt = 0; cnt < reTryCnt; cnt++ {
|
||||
dsStates, err := dataService.GetComponentStates()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
continue
|
||||
}
|
||||
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if cnt >= reTryCnt {
|
||||
panic("connect to data service failed")
|
||||
}
|
||||
|
||||
//if err = svr.SetDataService(dataService); err != nil {
|
||||
// panic(err)
|
||||
//}
|
||||
|
||||
log.Printf("index service address : %s", is.Params.Address)
|
||||
indexService := isc.NewClient(is.Params.Address)
|
||||
|
||||
if err = svr.SetIndexService(indexService); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = svr.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
sig := <-sc
|
||||
log.Printf("Got %s signal to exit", sig.String())
|
||||
_ = svr.Stop()
|
||||
}
|
@ -29,10 +29,6 @@ msgChannel:
|
||||
queryNodeStats: "query-node-stats"
|
||||
# cmd for loadIndex, flush, etc...
|
||||
cmd: "cmd"
|
||||
dataServiceInsertChannel: "insert-channel-"
|
||||
dataServiceStatistic: "dataservice-statistics-channel"
|
||||
dataServiceTimeTick: "dataservice-timetick-channel"
|
||||
dataServiceSegmentInfo: "segment-info-channel"
|
||||
|
||||
# sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
@ -41,7 +37,6 @@ msgChannel:
|
||||
queryNodeSubNamePrefix: "queryNode"
|
||||
writeNodeSubNamePrefix: "writeNode" # GOOSE TODO: remove this
|
||||
dataNodeSubNamePrefix: "dataNode"
|
||||
dataServiceSubNamePrefix: "dataService"
|
||||
|
||||
# default channel range [0, 1)
|
||||
channelRange:
|
||||
|
@ -1,13 +0,0 @@
|
||||
dataservice:
|
||||
nodeID: 14040
|
||||
address: "127.0.0.1"
|
||||
port: 13333
|
||||
segment:
|
||||
# old name: segmentThreshold: 536870912
|
||||
size: 512 # MB
|
||||
sizeFactor: 0.75
|
||||
defaultSizePerRecord: 1024
|
||||
# old name: segmentExpireDuration: 2000
|
||||
IDAssignExpiration: 2000 # ms
|
||||
insertChannelNumPerCollection: 4
|
||||
dataNodeNum: 2
|
@ -24,3 +24,4 @@ master:
|
||||
|
||||
maxPartitionNum: 4096
|
||||
nodeID: 100
|
||||
timeout: 5 # time out, 5 seconds
|
@ -25,7 +25,7 @@ func (alloc *allocatorImpl) allocID() (UniqueID, error) {
|
||||
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgID: 1, // GOOSE TODO add msg id
|
||||
MsgID: 1, // GOOSE TODO
|
||||
Timestamp: 0, // GOOSE TODO
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
|
@ -1,9 +1,6 @@
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -24,17 +21,9 @@ func (c *Collection) Schema() *schemapb.CollectionSchema {
|
||||
return c.schema
|
||||
}
|
||||
|
||||
func newCollection(collectionID UniqueID, schemaStr string) *Collection {
|
||||
|
||||
var schema schemapb.CollectionSchema
|
||||
err := proto.UnmarshalText(schemaStr, &schema)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection {
|
||||
var newCollection = &Collection{
|
||||
schema: &schema,
|
||||
schema: schema,
|
||||
id: collectionID,
|
||||
}
|
||||
return newCollection
|
||||
|
@ -6,13 +6,14 @@ import (
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
type collectionReplica interface {
|
||||
|
||||
// collection
|
||||
getCollectionNum() int
|
||||
addCollection(collectionID UniqueID, schemaBlob string) error
|
||||
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
|
||||
removeCollection(collectionID UniqueID) error
|
||||
getCollectionByID(collectionID UniqueID) (*Collection, error)
|
||||
getCollectionByName(collectionName string) (*Collection, error)
|
||||
@ -162,11 +163,11 @@ func (colReplica *collectionReplicaImpl) getCollectionNum() int {
|
||||
return len(colReplica.collections)
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error {
|
||||
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var newCollection = newCollection(collectionID, schemaBlob)
|
||||
var newCollection = newCollection(collectionID, schema)
|
||||
colReplica.collections = append(colReplica.collections, newCollection)
|
||||
log.Println("Create collection: ", newCollection.Name())
|
||||
|
||||
|
@ -3,7 +3,6 @@ package datanode
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -23,10 +22,7 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName
|
||||
Factory := &MetaFactory{}
|
||||
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
|
||||
|
||||
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
|
||||
require.NotEqual(t, "", schemaBlob)
|
||||
|
||||
var err = replica.addCollection(collectionMeta.ID, schemaBlob)
|
||||
var err = replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
collection, err := replica.getCollectionByName(collectionName)
|
||||
|
@ -3,7 +3,6 @@ package datanode
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -13,10 +12,7 @@ func TestCollection_newCollection(t *testing.T) {
|
||||
Factory := &MetaFactory{}
|
||||
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
|
||||
|
||||
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
|
||||
assert.NotEqual(t, "", schemaBlob)
|
||||
|
||||
collection := newCollection(collectionMeta.ID, schemaBlob)
|
||||
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
|
||||
assert.Equal(t, collection.Name(), collectionName)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
}
|
||||
@ -27,10 +23,7 @@ func TestCollection_deleteCollection(t *testing.T) {
|
||||
Factory := &MetaFactory{}
|
||||
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
|
||||
|
||||
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
|
||||
assert.NotEqual(t, "", schemaBlob)
|
||||
|
||||
collection := newCollection(collectionMeta.ID, schemaBlob)
|
||||
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
|
||||
assert.Equal(t, collection.Name(), collectionName)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
}
|
||||
|
@ -26,8 +26,8 @@ const (
|
||||
type (
|
||||
Inteface interface {
|
||||
typeutil.Service
|
||||
typeutil.Component
|
||||
|
||||
GetComponentStates() (*internalpb2.ComponentStates, error)
|
||||
WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
|
||||
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
|
||||
}
|
||||
@ -43,6 +43,7 @@ type (
|
||||
}
|
||||
|
||||
DataNode struct {
|
||||
// GOOSE TODO: complete interface with component
|
||||
ctx context.Context
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
@ -124,7 +125,7 @@ func (node *DataNode) Init() error {
|
||||
chanSize := 100
|
||||
flushChan := make(chan *flushMsg, chanSize)
|
||||
node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc)
|
||||
node.metaService = newMetaService(node.ctx, replica)
|
||||
node.metaService = newMetaService(node.ctx, replica, node.masterService)
|
||||
node.replica = replica
|
||||
|
||||
// Opentracing
|
||||
@ -154,7 +155,7 @@ func (node *DataNode) Init() error {
|
||||
func (node *DataNode) Start() error {
|
||||
|
||||
go node.dataSyncService.start()
|
||||
node.metaService.start()
|
||||
node.metaService.init()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
@ -42,7 +41,7 @@ func TestDataSyncService_Start(t *testing.T) {
|
||||
replica := newReplica()
|
||||
allocFactory := AllocatorFactory{}
|
||||
sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
|
||||
sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema))
|
||||
sync.replica.addCollection(collMeta.ID, collMeta.Schema)
|
||||
go sync.start()
|
||||
|
||||
// test data generate
|
||||
|
@ -3,6 +3,8 @@ package datanode
|
||||
import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -15,6 +17,12 @@ type (
|
||||
|
||||
AllocatorFactory struct {
|
||||
}
|
||||
|
||||
MasterServiceFactory struct {
|
||||
ID UniqueID
|
||||
collectionName string
|
||||
collectionID UniqueID
|
||||
}
|
||||
)
|
||||
|
||||
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
|
||||
@ -156,3 +164,42 @@ func (alloc AllocatorFactory) allocID() (UniqueID, error) {
|
||||
// GOOSE TODO: random ID generate
|
||||
return UniqueID(0), nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setID(id UniqueID) {
|
||||
m.ID = id // GOOSE TODO: random ID generator
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
|
||||
m.collectionID = id
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionName(name string) {
|
||||
m.collectionName = name
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
|
||||
resp := &masterpb.IDResponse{
|
||||
Status: &commonpb.Status{},
|
||||
ID: m.ID,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
|
||||
resp := &milvuspb.ShowCollectionResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionNames: []string{m.collectionName},
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
f := MetaFactory{}
|
||||
meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
|
||||
resp := &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionID: m.collectionID,
|
||||
Schema: meta.Schema,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -224,9 +224,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
|
||||
return
|
||||
}
|
||||
|
||||
schemaStr := proto.MarshalTextString(&schema)
|
||||
// add collection
|
||||
err = ddNode.replica.addCollection(collectionID, schemaStr)
|
||||
err = ddNode.replica.addCollection(collectionID, &schema)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
@ -39,11 +38,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
||||
|
||||
Factory := &MetaFactory{}
|
||||
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
|
||||
schemaBlob := proto.MarshalTextString(collMeta.Schema)
|
||||
require.NotEqual(t, "", schemaBlob)
|
||||
|
||||
replica := newReplica()
|
||||
err = replica.addCollection(collMeta.ID, schemaBlob)
|
||||
err = replica.addCollection(collMeta.ID, collMeta.Schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Params.FlushInsertBufSize = 2
|
||||
|
@ -4,73 +4,91 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
)
|
||||
|
||||
type metaService struct {
|
||||
ctx context.Context
|
||||
kvBase *etcdkv.EtcdKV
|
||||
replica collectionReplica
|
||||
ctx context.Context
|
||||
replica collectionReplica
|
||||
masterClient MasterServiceInterface
|
||||
}
|
||||
|
||||
func newMetaService(ctx context.Context, replica collectionReplica) *metaService {
|
||||
ETCDAddr := Params.EtcdAddress
|
||||
MetaRootPath := Params.MetaRootPath
|
||||
|
||||
cli, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{ETCDAddr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
|
||||
func newMetaService(ctx context.Context, replica collectionReplica, m MasterServiceInterface) *metaService {
|
||||
return &metaService{
|
||||
ctx: ctx,
|
||||
kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath),
|
||||
replica: replica,
|
||||
ctx: ctx,
|
||||
replica: replica,
|
||||
masterClient: m,
|
||||
}
|
||||
}
|
||||
|
||||
func (mService *metaService) start() {
|
||||
// init from meta
|
||||
func (mService *metaService) init() {
|
||||
err := mService.loadCollections()
|
||||
if err != nil {
|
||||
log.Fatal("metaService loadCollections failed")
|
||||
log.Fatal("metaService init failed:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func GetCollectionObjID(key string) string {
|
||||
ETCDRootPath := Params.MetaRootPath
|
||||
func (mService *metaService) loadCollections() error {
|
||||
names, err := mService.getCollectionNames()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
|
||||
return strings.TrimPrefix(key, prefix)
|
||||
for _, name := range names {
|
||||
err := mService.createCollection(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isCollectionObj(key string) bool {
|
||||
ETCDRootPath := Params.MetaRootPath
|
||||
func (mService *metaService) getCollectionNames() ([]string, error) {
|
||||
req := &milvuspb.ShowCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgID: 0, //GOOSE TODO
|
||||
Timestamp: 0, // GOOSE TODO
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
DbName: "default", // GOOSE TODO
|
||||
}
|
||||
|
||||
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
|
||||
prefix = strings.TrimSpace(prefix)
|
||||
index := strings.Index(key, prefix)
|
||||
|
||||
return index == 0
|
||||
response, err := mService.masterClient.ShowCollections(req)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("Get collection names from master service wrong: %v", err)
|
||||
}
|
||||
return response.GetCollectionNames(), nil
|
||||
}
|
||||
|
||||
func isSegmentObj(key string) bool {
|
||||
ETCDRootPath := Params.MetaRootPath
|
||||
func (mService *metaService) createCollection(name string) error {
|
||||
req := &milvuspb.DescribeCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||
MsgID: 0, //GOOSE TODO
|
||||
Timestamp: 0, // GOOSE TODO
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
DbName: "default", // GOOSE TODO
|
||||
CollectionName: name,
|
||||
}
|
||||
|
||||
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
|
||||
prefix = strings.TrimSpace(prefix)
|
||||
index := strings.Index(key, prefix)
|
||||
response, err := mService.masterClient.DescribeCollection(req)
|
||||
if err != nil {
|
||||
return errors.Errorf("Describe collection %v from master service wrong: %v", name, err)
|
||||
}
|
||||
|
||||
return index == 0
|
||||
err = mService.replica.addCollection(response.GetCollectionID(), response.GetSchema())
|
||||
if err != nil {
|
||||
return errors.Errorf("Add collection %v into collReplica wrong: %v", name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func printCollectionStruct(obj *etcdpb.CollectionMeta) {
|
||||
@ -85,51 +103,3 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) {
|
||||
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
|
||||
}
|
||||
}
|
||||
|
||||
func (mService *metaService) processCollectionCreate(id string, value string) {
|
||||
//println(fmt.Sprintf("Create Collection:$%s$", id))
|
||||
|
||||
col := mService.collectionUnmarshal(value)
|
||||
if col != nil {
|
||||
schema := col.Schema
|
||||
schemaBlob := proto.MarshalTextString(schema)
|
||||
err := mService.replica.addCollection(col.ID, schemaBlob)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mService *metaService) loadCollections() error {
|
||||
keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range keys {
|
||||
objID := GetCollectionObjID(keys[i])
|
||||
mService.processCollectionCreate(objID, values[i])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------- Unmarshal and Marshal
|
||||
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta {
|
||||
col := etcdpb.CollectionMeta{}
|
||||
err := proto.UnmarshalText(value, &col)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil
|
||||
}
|
||||
return &col
|
||||
}
|
||||
|
||||
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string {
|
||||
value := proto.MarshalTextString(col)
|
||||
if value == "" {
|
||||
log.Println("marshal collection failed")
|
||||
return ""
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
@ -7,94 +7,46 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMetaService_start(t *testing.T) {
|
||||
func TestMetaService_All(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
replica := newReplica()
|
||||
mFactory := &MasterServiceFactory{}
|
||||
mFactory.setCollectionID(0)
|
||||
mFactory.setCollectionName("a-collection")
|
||||
metaService := newMetaService(ctx, replica, mFactory)
|
||||
|
||||
metaService := newMetaService(ctx, replica)
|
||||
t.Run("Test getCollectionNames", func(t *testing.T) {
|
||||
names, err := metaService.getCollectionNames()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(names))
|
||||
assert.Equal(t, "a-collection", names[0])
|
||||
})
|
||||
|
||||
t.Run("Test createCollection", func(t *testing.T) {
|
||||
hasColletion := metaService.replica.hasCollection(0)
|
||||
assert.False(t, hasColletion)
|
||||
|
||||
err := metaService.createCollection("a-collection")
|
||||
assert.NoError(t, err)
|
||||
hasColletion = metaService.replica.hasCollection(0)
|
||||
assert.True(t, hasColletion)
|
||||
})
|
||||
|
||||
t.Run("Test loadCollections", func(t *testing.T) {
|
||||
hasColletion := metaService.replica.hasCollection(1)
|
||||
assert.False(t, hasColletion)
|
||||
|
||||
mFactory.setCollectionID(1)
|
||||
mFactory.setCollectionName("a-collection-1")
|
||||
err := metaService.loadCollections()
|
||||
assert.NoError(t, err)
|
||||
|
||||
hasColletion = metaService.replica.hasCollection(1)
|
||||
assert.True(t, hasColletion)
|
||||
hasColletion = metaService.replica.hasCollection(0)
|
||||
assert.True(t, hasColletion)
|
||||
})
|
||||
|
||||
metaService.start()
|
||||
}
|
||||
|
||||
func TestMetaService_getCollectionObjId(t *testing.T) {
|
||||
var key = "/collection/collection0"
|
||||
var collectionObjID1 = GetCollectionObjID(key)
|
||||
|
||||
assert.Equal(t, collectionObjID1, "/collection/collection0")
|
||||
|
||||
key = "fakeKey"
|
||||
var collectionObjID2 = GetCollectionObjID(key)
|
||||
|
||||
assert.Equal(t, collectionObjID2, "fakeKey")
|
||||
}
|
||||
|
||||
func TestMetaService_isCollectionObj(t *testing.T) {
|
||||
var key = Params.MetaRootPath + "/collection/collection0"
|
||||
var b1 = isCollectionObj(key)
|
||||
|
||||
assert.Equal(t, b1, true)
|
||||
|
||||
key = Params.MetaRootPath + "/segment/segment0"
|
||||
var b2 = isCollectionObj(key)
|
||||
|
||||
assert.Equal(t, b2, false)
|
||||
}
|
||||
|
||||
func TestMetaService_processCollectionCreate(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
replica := newReplica()
|
||||
metaService := newMetaService(ctx, replica)
|
||||
defer cancel()
|
||||
id := "0"
|
||||
value := `schema: <
|
||||
name: "test"
|
||||
fields: <
|
||||
fieldID:100
|
||||
name: "vec"
|
||||
data_type: VECTOR_FLOAT
|
||||
type_params: <
|
||||
key: "dim"
|
||||
value: "16"
|
||||
>
|
||||
index_params: <
|
||||
key: "metric_type"
|
||||
value: "L2"
|
||||
>
|
||||
>
|
||||
fields: <
|
||||
fieldID:101
|
||||
name: "age"
|
||||
data_type: INT32
|
||||
type_params: <
|
||||
key: "dim"
|
||||
value: "1"
|
||||
>
|
||||
>
|
||||
>
|
||||
segmentIDs: 0
|
||||
partition_tags: "default"
|
||||
`
|
||||
|
||||
metaService.processCollectionCreate(id, value)
|
||||
|
||||
collectionNum := replica.getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := replica.getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
}
|
||||
|
||||
func TestMetaService_loadCollections(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
replica := newReplica()
|
||||
|
||||
metaService := newMetaService(ctx, replica)
|
||||
|
||||
err2 := (*metaService).loadCollections()
|
||||
assert.Nil(t, err2)
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package dataservice
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
||||
@ -13,6 +11,8 @@ type ParamTable struct {
|
||||
Port int
|
||||
NodeID int64
|
||||
|
||||
MasterAddress string
|
||||
|
||||
EtcdAddress string
|
||||
MetaRootPath string
|
||||
KvRootPath string
|
||||
@ -31,7 +31,6 @@ type ParamTable struct {
|
||||
DataNodeNum int
|
||||
SegmentInfoChannelName string
|
||||
DataServiceSubscriptionName string
|
||||
K2SChannelNames []string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
@ -40,14 +39,15 @@ func (p *ParamTable) Init() {
|
||||
// load yaml
|
||||
p.BaseTable.Init()
|
||||
|
||||
if err := p.LoadYaml("advanced/data_service.yaml"); err != nil {
|
||||
err := p.LoadYaml("advanced/master.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// set members
|
||||
p.initAddress()
|
||||
p.initPort()
|
||||
p.initNodeID()
|
||||
p.NodeID = 1 // todo
|
||||
|
||||
p.initEtcdAddress()
|
||||
p.initMetaRootPath()
|
||||
@ -68,19 +68,15 @@ func (p *ParamTable) Init() {
|
||||
}
|
||||
|
||||
func (p *ParamTable) initAddress() {
|
||||
dataserviceAddress, err := p.Load("dataservice.address")
|
||||
masterAddress, err := p.Load("master.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Address = dataserviceAddress
|
||||
p.Address = masterAddress
|
||||
}
|
||||
|
||||
func (p *ParamTable) initPort() {
|
||||
p.Port = p.ParseInt("dataservice.port")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initNodeID() {
|
||||
p.NodeID = p.ParseInt64("dataservice.nodeID")
|
||||
p.Port = p.ParseInt("master.port")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initEtcdAddress() {
|
||||
@ -123,83 +119,46 @@ func (p *ParamTable) initKvRootPath() {
|
||||
p.KvRootPath = rootPath + "/" + subPath
|
||||
}
|
||||
func (p *ParamTable) initSegmentSize() {
|
||||
p.SegmentSize = p.ParseFloat("dataservice.segment.size")
|
||||
p.SegmentSize = p.ParseFloat("master.segment.size")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentSizeFactor() {
|
||||
p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor")
|
||||
p.SegmentSizeFactor = p.ParseFloat("master.segment.sizeFactor")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDefaultRecordSize() {
|
||||
p.DefaultRecordSize = p.ParseInt64("dataservice.segment.defaultSizePerRecord")
|
||||
p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord")
|
||||
}
|
||||
|
||||
// TODO read from config/env
|
||||
func (p *ParamTable) initSegIDAssignExpiration() {
|
||||
p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms
|
||||
p.SegIDAssignExpiration = 3000 //ms
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertChannelPrefixName() {
|
||||
var err error
|
||||
p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.InsertChannelPrefixName = "insert-channel-"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertChannelNumPerCollection() {
|
||||
p.InsertChannelNumPerCollection = p.ParseInt64("dataservice.insertChannelNumPerCollection")
|
||||
p.InsertChannelNumPerCollection = 4
|
||||
}
|
||||
|
||||
func (p *ParamTable) initStatisticsChannelName() {
|
||||
var err error
|
||||
p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.StatisticsChannelName = "dataservice-statistics-channel"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initTimeTickChannelName() {
|
||||
var err error
|
||||
p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.TimeTickChannelName = "dataservice-timetick-channel"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDataNodeNum() {
|
||||
p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum")
|
||||
p.DataNodeNum = 2
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentInfoChannelName() {
|
||||
var err error
|
||||
p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.SegmentInfoChannelName = "segment-info-channel"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDataServiceSubscriptionName() {
|
||||
var err error
|
||||
p.DataServiceSubscriptionName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSubNamePrefix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initK2SChannelNames() {
|
||||
prefix, err := p.Load("msgChannel.chanNamePrefix.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
iRangeStr, err := p.Load("msgChannel.channelRange.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
p.K2SChannelNames = ret
|
||||
p.DataServiceSubscriptionName = "dataserive-sub"
|
||||
}
|
||||
|
@ -49,31 +49,28 @@ type (
|
||||
UniqueID = typeutil.UniqueID
|
||||
Timestamp = typeutil.Timestamp
|
||||
Server struct {
|
||||
ctx context.Context
|
||||
serverLoopCtx context.Context
|
||||
serverLoopCancel context.CancelFunc
|
||||
serverLoopWg sync.WaitGroup
|
||||
state internalpb2.StateCode
|
||||
client *etcdkv.EtcdKV
|
||||
meta *meta
|
||||
segAllocator segmentAllocator
|
||||
statsHandler *statsHandler
|
||||
insertChannelMgr *insertChannelManager
|
||||
allocator allocator
|
||||
cluster *dataNodeCluster
|
||||
msgProducer *timesync.MsgProducer
|
||||
registerFinishCh chan struct{}
|
||||
masterClient *masterservice.GrpcClient
|
||||
ttMsgStream msgstream.MsgStream
|
||||
k2sMsgStream msgstream.MsgStream
|
||||
ddChannelName string
|
||||
segmentInfoStream msgstream.MsgStream
|
||||
segmentFlushStream msgstream.MsgStream
|
||||
ctx context.Context
|
||||
serverLoopCtx context.Context
|
||||
serverLoopCancel context.CancelFunc
|
||||
serverLoopWg sync.WaitGroup
|
||||
state internalpb2.StateCode
|
||||
client *etcdkv.EtcdKV
|
||||
meta *meta
|
||||
segAllocator segmentAllocator
|
||||
statsHandler *statsHandler
|
||||
insertChannelMgr *insertChannelManager
|
||||
allocator allocator
|
||||
cluster *dataNodeCluster
|
||||
msgProducer *timesync.MsgProducer
|
||||
registerFinishCh chan struct{}
|
||||
masterClient *masterservice.GrpcClient
|
||||
ttMsgStream msgstream.MsgStream
|
||||
ddChannelName string
|
||||
segmentInfoStream msgstream.MsgStream
|
||||
}
|
||||
)
|
||||
|
||||
func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) {
|
||||
Params.Init()
|
||||
ch := make(chan struct{})
|
||||
return &Server{
|
||||
ctx: ctx,
|
||||
@ -86,29 +83,32 @@ func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Serve
|
||||
}
|
||||
|
||||
func (s *Server) Init() error {
|
||||
Params.Init()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
var err error
|
||||
s.allocator = newAllocatorImpl(s.masterClient)
|
||||
if err = s.initMeta(); err != nil {
|
||||
if err := s.initMeta(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.statsHandler = newStatsHandler(s.meta)
|
||||
s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator)
|
||||
segAllocator, err := newSegmentAllocator(s.meta, s.allocator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.initSegmentInfoChannel()
|
||||
if err = s.initMsgProducer(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.segAllocator = segAllocator
|
||||
s.waitDataNodeRegister()
|
||||
|
||||
if err = s.loadMetaFromMaster(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.initMsgProducer(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.initSegmentInfoChannel()
|
||||
s.startServerLoop()
|
||||
s.waitDataNodeRegister()
|
||||
s.state = internalpb2.StateCode_HEALTHY
|
||||
log.Println("start success")
|
||||
return nil
|
||||
@ -128,28 +128,21 @@ func (s *Server) initMeta() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initSegmentInfoChannel() {
|
||||
segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
|
||||
segmentInfoStream.SetPulsarClient(Params.PulsarAddress)
|
||||
segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName})
|
||||
s.segmentInfoStream = segmentInfoStream
|
||||
s.segmentInfoStream.Start()
|
||||
func (s *Server) waitDataNodeRegister() {
|
||||
log.Println("waiting data node to register")
|
||||
<-s.registerFinishCh
|
||||
log.Println("all data nodes register")
|
||||
}
|
||||
|
||||
func (s *Server) initMsgProducer() error {
|
||||
ttMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
|
||||
ttMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
|
||||
ttMsgStream.SetPulsarClient(Params.PulsarAddress)
|
||||
ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
|
||||
s.ttMsgStream = ttMsgStream
|
||||
s.ttMsgStream.Start()
|
||||
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
|
||||
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
|
||||
k2sStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
|
||||
k2sStream.SetPulsarClient(Params.PulsarAddress)
|
||||
k2sStream.CreatePulsarProducers(Params.K2SChannelNames)
|
||||
s.k2sMsgStream = k2sStream
|
||||
s.k2sMsgStream.Start()
|
||||
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
|
||||
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher)
|
||||
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -158,6 +151,46 @@ func (s *Server) initMsgProducer() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) startServerLoop() {
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
s.serverLoopWg.Add(1)
|
||||
go s.startStatsChannel(s.serverLoopCtx)
|
||||
}
|
||||
|
||||
func (s *Server) startStatsChannel(ctx context.Context) {
|
||||
defer s.serverLoopWg.Done()
|
||||
statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
|
||||
statsStream.SetPulsarClient(Params.PulsarAddress)
|
||||
statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
|
||||
statsStream.Start()
|
||||
defer statsStream.Close()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
msgPack := statsStream.Consume()
|
||||
for _, msg := range msgPack.Msgs {
|
||||
statistics := msg.(*msgstream.SegmentStatisticsMsg)
|
||||
for _, stat := range statistics.SegStats {
|
||||
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) initSegmentInfoChannel() {
|
||||
segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
|
||||
segmentInfoStream.SetPulsarClient(Params.PulsarAddress)
|
||||
segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName})
|
||||
s.segmentInfoStream = segmentInfoStream
|
||||
s.segmentInfoStream.Start()
|
||||
}
|
||||
|
||||
func (s *Server) loadMetaFromMaster() error {
|
||||
log.Println("loading collection meta from master")
|
||||
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
|
||||
@ -215,83 +248,9 @@ func (s *Server) loadMetaFromMaster() error {
|
||||
log.Println("load collection meta from master complete")
|
||||
return nil
|
||||
}
|
||||
func (s *Server) startServerLoop() {
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
s.serverLoopWg.Add(2)
|
||||
go s.startStatsChannel(s.serverLoopCtx)
|
||||
go s.startSegmentFlushChannel(s.serverLoopCtx)
|
||||
}
|
||||
|
||||
func (s *Server) startStatsChannel(ctx context.Context) {
|
||||
defer s.serverLoopWg.Done()
|
||||
statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
|
||||
statsStream.SetPulsarClient(Params.PulsarAddress)
|
||||
statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
|
||||
statsStream.Start()
|
||||
defer statsStream.Close()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
msgPack := statsStream.Consume()
|
||||
for _, msg := range msgPack.Msgs {
|
||||
statistics := msg.(*msgstream.SegmentStatisticsMsg)
|
||||
for _, stat := range statistics.SegStats {
|
||||
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) startSegmentFlushChannel(ctx context.Context) {
|
||||
defer s.serverLoopWg.Done()
|
||||
flushStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
|
||||
flushStream.SetPulsarClient(Params.PulsarAddress)
|
||||
flushStream.CreatePulsarConsumers([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
|
||||
flushStream.Start()
|
||||
defer flushStream.Close()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("segment flush channel shut down")
|
||||
return
|
||||
default:
|
||||
}
|
||||
msgPack := flushStream.Consume()
|
||||
for _, msg := range msgPack.Msgs {
|
||||
if msg.Type() != commonpb.MsgType_kSegmentFlushDone {
|
||||
continue
|
||||
}
|
||||
realMsg := msg.(*msgstream.FlushCompletedMsg)
|
||||
|
||||
segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID)
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
}
|
||||
segmentInfo.FlushedTime = realMsg.BeginTimestamp
|
||||
if err = s.meta.UpdateSegment(segmentInfo); err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) waitDataNodeRegister() {
|
||||
log.Println("waiting data node to register")
|
||||
<-s.registerFinishCh
|
||||
log.Println("all data nodes register")
|
||||
}
|
||||
|
||||
func (s *Server) Stop() error {
|
||||
s.ttMsgStream.Close()
|
||||
s.k2sMsgStream.Close()
|
||||
s.msgProducer.Close()
|
||||
s.segmentInfoStream.Close()
|
||||
s.stopServerLoop()
|
||||
@ -439,23 +398,6 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
|
||||
if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
infoMsg := &msgstream.SegmentInfoMsg{
|
||||
SegmentMsg: datapb.SegmentMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kSegmentInfo,
|
||||
MsgID: 0,
|
||||
Timestamp: 0, // todo
|
||||
SourceID: 0,
|
||||
},
|
||||
Segment: segmentInfo,
|
||||
},
|
||||
}
|
||||
msgPack := &pulsarms.MsgPack{
|
||||
Msgs: []msgstream.TsMsg{infoMsg},
|
||||
}
|
||||
if err = s.segmentInfoStream.Produce(msgPack); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -480,8 +422,7 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg
|
||||
resp.CreateTime = segmentInfo.OpenTime
|
||||
resp.SealedTime = segmentInfo.SealedTime
|
||||
resp.FlushedTime = segmentInfo.FlushedTime
|
||||
resp.StartPositions = segmentInfo.StartPosition
|
||||
resp.EndPositions = segmentInfo.EndPosition
|
||||
// TODO start/end positions
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -20,25 +20,10 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat
|
||||
return err
|
||||
}
|
||||
|
||||
if segStats.IsNewSegment {
|
||||
segMeta.OpenTime = segStats.CreateTime
|
||||
segMeta.StartPosition = append(segMeta.StartPosition, segStats.StartPositions...)
|
||||
}
|
||||
segMeta.SealedTime = segStats.EndTime
|
||||
for _, pos := range segStats.EndPositions {
|
||||
isNew := true
|
||||
for _, epos := range segMeta.EndPosition {
|
||||
if epos.ChannelName == pos.ChannelName {
|
||||
epos.Timestamp = pos.Timestamp
|
||||
epos.MsgID = pos.MsgID
|
||||
isNew = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if isNew {
|
||||
segMeta.EndPosition = append(segMeta.EndPosition, pos)
|
||||
}
|
||||
}
|
||||
//if segStats.IsNewSegment {
|
||||
// segMeta.OpenTime = segStats.CreateTime
|
||||
// segMeta.segStats.StartPositions
|
||||
//}
|
||||
segMeta.NumRows = segStats.NumRows
|
||||
segMeta.MemSize = segStats.MemorySize
|
||||
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
@ -56,63 +57,93 @@ func (c *GrpcClient) Stop() error {
|
||||
}
|
||||
|
||||
func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) {
|
||||
return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{})
|
||||
}
|
||||
|
||||
//DDL request
|
||||
func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.CreateCollection(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.CreateCollection(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.DropCollection(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.DropCollection(ctx, in)
|
||||
}
|
||||
func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
|
||||
return c.grpcClient.HasCollection(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.HasCollection(ctx, in)
|
||||
}
|
||||
func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
return c.grpcClient.DescribeCollection(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.DescribeCollection(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
|
||||
return c.grpcClient.ShowCollections(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.ShowCollections(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.CreatePartition(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.CreatePartition(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.DropPartition(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.DropPartition(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
|
||||
return c.grpcClient.HasPartition(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.HasPartition(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
|
||||
return c.grpcClient.ShowPartitions(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.ShowPartitions(ctx, in)
|
||||
}
|
||||
|
||||
//index builder service
|
||||
func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.CreateIndex(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.CreateIndex(ctx, in)
|
||||
}
|
||||
func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
|
||||
return c.grpcClient.DescribeIndex(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.DescribeIndex(ctx, in)
|
||||
}
|
||||
|
||||
//global timestamp allocator
|
||||
func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
|
||||
return c.grpcClient.AllocTimestamp(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.AllocTimestamp(ctx, in)
|
||||
}
|
||||
func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
|
||||
return c.grpcClient.AllocID(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.AllocID(ctx, in)
|
||||
}
|
||||
|
||||
//receiver time tick from proxy service, and put it into this channel
|
||||
func (c *GrpcClient) GetTimeTickChannel() (string, error) {
|
||||
rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -124,7 +155,9 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) {
|
||||
|
||||
//receive ddl from rpc and time tick from proxy service, and put them into this channel
|
||||
func (c *GrpcClient) GetDdChannel() (string, error) {
|
||||
rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -136,7 +169,9 @@ func (c *GrpcClient) GetDdChannel() (string, error) {
|
||||
|
||||
//just define a channel, not used currently
|
||||
func (c *GrpcClient) GetStatisticsChannel() (string, error) {
|
||||
rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -147,9 +182,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) {
|
||||
}
|
||||
|
||||
func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
||||
return c.grpcClient.DescribeSegment(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.DescribeSegment(ctx, in)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
|
||||
return c.grpcClient.ShowSegments(context.Background(), in)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
|
||||
defer cancel()
|
||||
return c.grpcClient.ShowSegments(ctx, in)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"regexp"
|
||||
@ -26,7 +27,7 @@ func TestGrpcService(t *testing.T) {
|
||||
//cms.Params.Address = "127.0.0.1"
|
||||
cms.Params.Port = (randVal % 100) + 10000
|
||||
|
||||
svr, err := NewGrpcServer()
|
||||
svr, err := NewGrpcServer(context.Background())
|
||||
assert.Nil(t, err)
|
||||
|
||||
// cms.Params.NodeID = 0
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
@ -26,10 +27,10 @@ type GrpcServer struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewGrpcServer() (*GrpcServer, error) {
|
||||
func NewGrpcServer(ctx context.Context) (*GrpcServer, error) {
|
||||
s := &GrpcServer{}
|
||||
var err error
|
||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
if s.core, err = cms.NewCore(s.ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -73,6 +74,30 @@ func (s *GrpcServer) Stop() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error {
|
||||
c, ok := s.core.(*cms.Core)
|
||||
if !ok {
|
||||
return errors.Errorf("set proxy service failed")
|
||||
}
|
||||
return c.SetProxyService(p)
|
||||
}
|
||||
|
||||
func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error {
|
||||
c, ok := s.core.(*cms.Core)
|
||||
if !ok {
|
||||
return errors.Errorf("set data service failed")
|
||||
}
|
||||
return c.SetDataService(p)
|
||||
}
|
||||
|
||||
func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
|
||||
c, ok := s.core.(*cms.Core)
|
||||
if !ok {
|
||||
return errors.Errorf("set index service failed")
|
||||
}
|
||||
return c.SetIndexService(p)
|
||||
}
|
||||
|
||||
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
|
||||
return s.core.GetComponentStates()
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
@ -735,6 +736,13 @@ func (c *Core) GetStatisticsChannel() (string, error) {
|
||||
}
|
||||
|
||||
func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
t := &CreateCollectionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -758,6 +766,13 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb
|
||||
}
|
||||
|
||||
func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
t := &DropCollectionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -780,6 +795,16 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta
|
||||
}
|
||||
|
||||
func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
Value: false,
|
||||
}, nil
|
||||
}
|
||||
t := &HasCollectionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -809,6 +834,17 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR
|
||||
}
|
||||
|
||||
func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
Schema: nil,
|
||||
CollectionID: 0,
|
||||
}, nil
|
||||
}
|
||||
t := &DescribeCollectionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -836,6 +872,16 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv
|
||||
}
|
||||
|
||||
func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.ShowCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
CollectionNames: nil,
|
||||
}, nil
|
||||
}
|
||||
t := &ShowCollectionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -865,6 +911,13 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh
|
||||
}
|
||||
|
||||
func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
t := &CreatePartitionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -887,6 +940,13 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S
|
||||
}
|
||||
|
||||
func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
t := &DropPartitionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -909,6 +969,16 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu
|
||||
}
|
||||
|
||||
func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
Value: false,
|
||||
}, nil
|
||||
}
|
||||
t := &HasPartitionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -938,6 +1008,17 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes
|
||||
}
|
||||
|
||||
func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.ShowPartitionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
PartitionNames: nil,
|
||||
PartitionIDs: nil,
|
||||
}, nil
|
||||
}
|
||||
t := &ShowPartitionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -968,6 +1049,13 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show
|
||||
}
|
||||
|
||||
func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
t := &CreateIndexReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -990,6 +1078,16 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e
|
||||
}
|
||||
|
||||
func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.DescribeIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
IndexDescriptions: nil,
|
||||
}, nil
|
||||
}
|
||||
t := &DescribeIndexReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -1020,6 +1118,16 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr
|
||||
}
|
||||
|
||||
func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.DescribeSegmentResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
IndexID: 0,
|
||||
}, nil
|
||||
}
|
||||
t := &DescribeSegmentReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -1050,6 +1158,16 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D
|
||||
}
|
||||
|
||||
func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb2.StateCode)
|
||||
if code != internalpb2.StateCode_HEALTHY {
|
||||
return &milvuspb.ShowSegmentResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
|
||||
},
|
||||
SegmentIDs: nil,
|
||||
}, nil
|
||||
}
|
||||
t := &ShowSegmentReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
|
@ -27,6 +27,8 @@ type ParamTable struct {
|
||||
MaxPartitionNum int64
|
||||
DefaultPartitionName string
|
||||
DefaultIndexName string
|
||||
|
||||
Timeout int
|
||||
}
|
||||
|
||||
func (p *ParamTable) Init() {
|
||||
@ -54,6 +56,8 @@ func (p *ParamTable) Init() {
|
||||
p.initMaxPartitionNum()
|
||||
p.initDefaultPartitionName()
|
||||
p.initDefaultIndexName()
|
||||
|
||||
p.initTimeout()
|
||||
}
|
||||
|
||||
func (p *ParamTable) initAddress() {
|
||||
@ -163,3 +167,7 @@ func (p *ParamTable) initDefaultIndexName() {
|
||||
}
|
||||
p.DefaultIndexName = name
|
||||
}
|
||||
|
||||
func (p *ParamTable) initTimeout() {
|
||||
p.Timeout = p.ParseInt("master.timeout")
|
||||
}
|
||||
|
@ -50,4 +50,7 @@ func TestParamTable(t *testing.T) {
|
||||
|
||||
assert.NotEqual(t, Params.DefaultIndexName, "")
|
||||
t.Logf("default index name = %s", Params.DefaultIndexName)
|
||||
|
||||
assert.NotZero(t, Params.Timeout)
|
||||
t.Logf("master timeout = %d", Params.Timeout)
|
||||
}
|
||||
|
@ -54,6 +54,9 @@ type collectionReplica interface {
|
||||
getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)
|
||||
getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error)
|
||||
hasPartition(collectionID UniqueID, partitionTag string) bool
|
||||
enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error
|
||||
disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error
|
||||
getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error)
|
||||
|
||||
// segment
|
||||
getSegmentNum() int
|
||||
@ -362,6 +365,43 @@ func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, par
|
||||
return false
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partition.enableDM = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partition.enableDM = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return partition.enableDM, nil
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- segment
|
||||
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
|
||||
colReplica.mu.RLock()
|
||||
|
@ -19,17 +19,18 @@ type dataSyncService struct {
|
||||
}
|
||||
|
||||
func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService {
|
||||
|
||||
return &dataSyncService{
|
||||
service := &dataSyncService{
|
||||
ctx: ctx,
|
||||
fg: nil,
|
||||
|
||||
replica: replica,
|
||||
}
|
||||
|
||||
service.initNodes()
|
||||
return service
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) start() {
|
||||
dsService.initNodes()
|
||||
dsService.fg.Start()
|
||||
}
|
||||
|
||||
@ -47,7 +48,7 @@ func (dsService *dataSyncService) initNodes() {
|
||||
var dmStreamNode node = dsService.newDmInputNode(dsService.ctx)
|
||||
var ddStreamNode node = dsService.newDDInputNode(dsService.ctx)
|
||||
|
||||
var filterDmNode node = newFilteredDmNode()
|
||||
var filterDmNode node = newFilteredDmNode(dsService.replica)
|
||||
var ddNode node = newDDNode(dsService.replica)
|
||||
|
||||
var insertNode node = newInsertNode(dsService.replica)
|
||||
|
@ -12,7 +12,8 @@ import (
|
||||
|
||||
type filterDmNode struct {
|
||||
baseNode
|
||||
ddMsg *ddMsg
|
||||
ddMsg *ddMsg
|
||||
replica collectionReplica
|
||||
}
|
||||
|
||||
func (fdmNode *filterDmNode) Name() string {
|
||||
@ -102,6 +103,12 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
|
||||
}
|
||||
|
||||
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
|
||||
// TODO: open this check
|
||||
// check if partition dm enable
|
||||
//if enable, _ := fdmNode.replica.getEnablePartitionDM(msg.CollectionID, msg.PartitionID); !enable {
|
||||
// return nil
|
||||
//}
|
||||
|
||||
// No dd record, do all insert requests.
|
||||
records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName]
|
||||
if !ok {
|
||||
@ -154,7 +161,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
||||
return msg
|
||||
}
|
||||
|
||||
func newFilteredDmNode() *filterDmNode {
|
||||
func newFilteredDmNode(replica collectionReplica) *filterDmNode {
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.FlowGraphMaxParallelism
|
||||
|
||||
@ -164,5 +171,6 @@ func newFilteredDmNode() *filterDmNode {
|
||||
|
||||
return &filterDmNode{
|
||||
baseNode: baseNode,
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
@ -12,9 +12,11 @@ import (
|
||||
type ParamTable struct {
|
||||
paramtable.BaseTable
|
||||
|
||||
PulsarAddress string
|
||||
ETCDAddress string
|
||||
MetaRootPath string
|
||||
PulsarAddress string
|
||||
ETCDAddress string
|
||||
MetaRootPath string
|
||||
WriteNodeSegKvSubPath string
|
||||
IndexBuilderAddress string
|
||||
|
||||
QueryNodeIP string
|
||||
QueryNodePort int64
|
||||
@ -131,6 +133,8 @@ func (p *ParamTable) Init() {
|
||||
p.initPulsarAddress()
|
||||
p.initETCDAddress()
|
||||
p.initMetaRootPath()
|
||||
p.initWriteNodeSegKvSubPath()
|
||||
p.initIndexBuilderAddress()
|
||||
|
||||
p.initGracefulTime()
|
||||
p.initMsgChannelSubName()
|
||||
@ -246,6 +250,14 @@ func (p *ParamTable) initPulsarAddress() {
|
||||
p.PulsarAddress = url
|
||||
}
|
||||
|
||||
func (p *ParamTable) initIndexBuilderAddress() {
|
||||
ret, err := p.Load("_IndexBuilderAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.IndexBuilderAddress = ret
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertChannelRange() {
|
||||
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
|
||||
if err != nil {
|
||||
@ -338,6 +350,14 @@ func (p *ParamTable) initMetaRootPath() {
|
||||
p.MetaRootPath = rootPath + "/" + subPath
|
||||
}
|
||||
|
||||
func (p *ParamTable) initWriteNodeSegKvSubPath() {
|
||||
subPath, err := p.Load("etcd.writeNodeSegKvSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.WriteNodeSegKvSubPath = subPath + "/"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initGracefulTime() {
|
||||
p.GracefulTime = p.ParseInt64("queryNode.gracefulTime")
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ type Partition struct {
|
||||
partitionTag string
|
||||
id UniqueID
|
||||
segments []*Segment
|
||||
enableDM bool
|
||||
}
|
||||
|
||||
func (p *Partition) ID() UniqueID {
|
||||
@ -33,6 +34,7 @@ func (p *Partition) Segments() *[]*Segment {
|
||||
func newPartition2(partitionTag string) *Partition {
|
||||
var newPartition = &Partition{
|
||||
partitionTag: partitionTag,
|
||||
enableDM: false,
|
||||
}
|
||||
|
||||
return newPartition
|
||||
@ -40,7 +42,8 @@ func newPartition2(partitionTag string) *Partition {
|
||||
|
||||
func newPartition(partitionID UniqueID) *Partition {
|
||||
var newPartition = &Partition{
|
||||
id: partitionID,
|
||||
id: partitionID,
|
||||
enableDM: false,
|
||||
}
|
||||
|
||||
return newPartition
|
||||
|
@ -136,7 +136,7 @@ func (node *QueryNode) Start() error {
|
||||
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
|
||||
node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
|
||||
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
|
||||
node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.loadIndexService.loadIndexReqChan)
|
||||
node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan)
|
||||
|
||||
// start services
|
||||
go node.dataSyncService.start()
|
||||
@ -344,14 +344,31 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
|
||||
segmentIDs := in.SegmentIDs
|
||||
fieldIDs := in.FieldIDs
|
||||
|
||||
err := node.replica.enablePartitionDM(collectionID, partitionID)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
|
||||
// segments are ordered before LoadSegments calling
|
||||
if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing {
|
||||
segmentNum := len(segmentIDs)
|
||||
node.segManager.seekSegment(segmentIDs[segmentNum-1])
|
||||
positions := in.LastSegmentState.StartPositions
|
||||
err = node.segManager.seekSegment(positions)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
segmentIDs = segmentIDs[:segmentNum-1]
|
||||
}
|
||||
|
||||
err := node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
|
||||
err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
@ -363,6 +380,17 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
|
||||
}
|
||||
|
||||
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
|
||||
for _, id := range in.PartitionIDs {
|
||||
err := node.replica.enablePartitionDM(in.CollectionID, id)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
}
|
||||
|
||||
// release all fields in the segments
|
||||
for _, id := range in.SegmentIDs {
|
||||
err := node.segManager.releaseSegment(id)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
@ -13,52 +14,31 @@ import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"
|
||||
)
|
||||
|
||||
type segmentManager struct {
|
||||
replica collectionReplica
|
||||
|
||||
dmStream msgstream.MsgStream
|
||||
loadIndexReqChan chan []msgstream.TsMsg
|
||||
|
||||
// TODO: replace by client instead of grpc client
|
||||
dataClient datapb.DataServiceClient
|
||||
indexBuilderClient indexpb.IndexServiceClient
|
||||
dataClient *writerclient.Client
|
||||
indexClient *indexnodeclient.Client
|
||||
|
||||
kv kv.Base // minio kv
|
||||
iCodec *storage.InsertCodec
|
||||
}
|
||||
|
||||
func newSegmentManager(ctx context.Context, replica collectionReplica, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager {
|
||||
bucketName := Params.MinioBucketName
|
||||
option := &miniokv.Option{
|
||||
Address: Params.MinioEndPoint,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSLStr,
|
||||
BucketName: bucketName,
|
||||
CreateBucket: true,
|
||||
}
|
||||
|
||||
minioKV, err := miniokv.NewMinIOKV(ctx, option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &segmentManager{
|
||||
replica: replica,
|
||||
loadIndexReqChan: loadIndexReqChan,
|
||||
|
||||
// TODO: init clients
|
||||
dataClient: nil,
|
||||
indexBuilderClient: nil,
|
||||
|
||||
kv: minioKV,
|
||||
iCodec: &storage.InsertCodec{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *segmentManager) seekSegment(segmentID UniqueID) {
|
||||
// TODO: impl
|
||||
func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error {
|
||||
// TODO: open seek
|
||||
//for _, position := range positions {
|
||||
// err := s.dmStream.Seek(position)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error {
|
||||
@ -81,7 +61,11 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID
|
||||
}
|
||||
|
||||
targetFields := s.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs)
|
||||
// create segment
|
||||
// replace segment
|
||||
err = s.replica.removeSegment(segmentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -118,16 +102,25 @@ func (s *segmentManager) getInsertBinlogPaths(segmentID UniqueID) ([]*internalPb
|
||||
SegmentID: segmentID,
|
||||
}
|
||||
|
||||
pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), insertBinlogPathRequest)
|
||||
pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest.SegmentID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
|
||||
return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
|
||||
//if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
|
||||
// return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
|
||||
//}
|
||||
|
||||
fieldIDs := make([]int64, 0)
|
||||
paths := make([]*internalPb.StringList, 0)
|
||||
for k, v := range pathResponse {
|
||||
fieldIDs = append(fieldIDs, k)
|
||||
paths = append(paths, &internalPb.StringList{
|
||||
Values: v,
|
||||
})
|
||||
}
|
||||
|
||||
return pathResponse.Paths, pathResponse.FieldIDs, nil
|
||||
return paths, fieldIDs, nil
|
||||
}
|
||||
|
||||
func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList {
|
||||
@ -234,12 +227,15 @@ func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) {
|
||||
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
|
||||
IndexIDs: []UniqueID{indexID},
|
||||
}
|
||||
pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), indexFilePathRequest)
|
||||
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest.IndexIDs)
|
||||
//if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
// return nil, err
|
||||
//}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pathResponse.FilePaths[0].IndexFilePaths, nil
|
||||
return pathResponse[0], nil
|
||||
}
|
||||
|
||||
func (s *segmentManager) getIndexParam() (indexParam, error) {
|
||||
@ -293,3 +289,42 @@ func (s *segmentManager) sendLoadIndex(indexPaths []string,
|
||||
messages := []msgstream.TsMsg{loadIndexMsg}
|
||||
s.loadIndexReqChan <- messages
|
||||
}
|
||||
|
||||
func newSegmentManager(ctx context.Context, replica collectionReplica, dmStream msgstream.MsgStream, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager {
|
||||
bucketName := Params.MinioBucketName
|
||||
option := &miniokv.Option{
|
||||
Address: Params.MinioEndPoint,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSLStr,
|
||||
BucketName: bucketName,
|
||||
CreateBucket: true,
|
||||
}
|
||||
|
||||
minioKV, err := miniokv.NewMinIOKV(ctx, option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
dataClient, err := writerclient.NewWriterClient(Params.ETCDAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
indexClient, err := indexnodeclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &segmentManager{
|
||||
replica: replica,
|
||||
dmStream: dmStream,
|
||||
loadIndexReqChan: loadIndexReqChan,
|
||||
|
||||
dataClient: dataClient,
|
||||
indexClient: indexClient,
|
||||
|
||||
kv: minioKV,
|
||||
iCodec: &storage.InsertCodec{},
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,9 @@ import (
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
@ -23,7 +26,7 @@ import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
)
|
||||
|
||||
func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) ([]*internalPb.StringList, []int64, error) {
|
||||
func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string) ([]*internalPb.StringList, []int64, error) {
|
||||
const (
|
||||
msgLength = 1000
|
||||
DIM = 16
|
||||
@ -108,10 +111,8 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
|
||||
}
|
||||
|
||||
// binLogs -> minIO/S3
|
||||
collIDStr := strconv.FormatInt(collectionID, 10)
|
||||
partitionIDStr := strconv.FormatInt(partitionID, 10)
|
||||
segIDStr := strconv.FormatInt(segmentID, 10)
|
||||
keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", collIDStr, partitionIDStr, segIDStr)
|
||||
keyPrefix = path.Join(keyPrefix, segIDStr)
|
||||
|
||||
paths := make([]*internalPb.StringList, 0)
|
||||
fieldIDs := make([]int64, 0)
|
||||
@ -214,18 +215,197 @@ func generateIndex(segmentID UniqueID) ([]string, indexParam, error) {
|
||||
return indexPaths, indexParams, nil
|
||||
}
|
||||
|
||||
func doInsert(ctx context.Context, collectionName string, partitionTag string, segmentID UniqueID) error {
|
||||
const msgLength = 1000
|
||||
const DIM = 16
|
||||
|
||||
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
var rawData []byte
|
||||
for _, ele := range vec {
|
||||
buf := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
|
||||
rawData = append(rawData, buf...)
|
||||
}
|
||||
bs := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(bs, 1)
|
||||
rawData = append(rawData, bs...)
|
||||
|
||||
timeRange := TimeRange{
|
||||
timestampMin: 0,
|
||||
timestampMax: math.MaxUint64,
|
||||
}
|
||||
|
||||
// messages generate
|
||||
insertMessages := make([]msgstream.TsMsg, 0)
|
||||
for i := 0; i < msgLength; i++ {
|
||||
var msg msgstream.TsMsg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{
|
||||
uint32(i),
|
||||
},
|
||||
},
|
||||
InsertRequest: internalPb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kInsert,
|
||||
MsgID: 0,
|
||||
Timestamp: uint64(i + 1000),
|
||||
SourceID: 0,
|
||||
},
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionTag,
|
||||
SegmentID: segmentID,
|
||||
ChannelID: "0",
|
||||
Timestamps: []uint64{uint64(i + 1000)},
|
||||
RowIDs: []int64{int64(i)},
|
||||
RowData: []*commonpb.Blob{
|
||||
{Value: rawData},
|
||||
},
|
||||
},
|
||||
}
|
||||
insertMessages = append(insertMessages, msg)
|
||||
}
|
||||
|
||||
msgPack := msgstream.MsgPack{
|
||||
BeginTs: timeRange.timestampMin,
|
||||
EndTs: timeRange.timestampMax,
|
||||
Msgs: insertMessages,
|
||||
}
|
||||
|
||||
// generate timeTick
|
||||
timeTickMsgPack := msgstream.MsgPack{}
|
||||
baseMsg := msgstream.BaseMsg{
|
||||
BeginTimestamp: 1000,
|
||||
EndTimestamp: 1500,
|
||||
HashValues: []uint32{0},
|
||||
}
|
||||
timeTickResult := internalPb.TimeTickMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kTimeTick,
|
||||
MsgID: 0,
|
||||
Timestamp: 1000,
|
||||
SourceID: 0,
|
||||
},
|
||||
}
|
||||
timeTickMsg := &msgstream.TimeTickMsg{
|
||||
BaseMsg: baseMsg,
|
||||
TimeTickMsg: timeTickResult,
|
||||
}
|
||||
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
|
||||
|
||||
// pulsar produce
|
||||
const receiveBufSize = 1024
|
||||
insertChannels := Params.InsertChannelNames
|
||||
ddChannels := Params.DDChannelNames
|
||||
pulsarURL := Params.PulsarAddress
|
||||
|
||||
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
insertStream.SetPulsarClient(pulsarURL)
|
||||
insertStream.CreatePulsarProducers(insertChannels)
|
||||
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
||||
insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize)
|
||||
|
||||
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
ddStream.SetPulsarClient(pulsarURL)
|
||||
ddStream.CreatePulsarProducers(ddChannels)
|
||||
|
||||
var insertMsgStream msgstream.MsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
|
||||
var ddMsgStream msgstream.MsgStream = ddStream
|
||||
ddMsgStream.Start()
|
||||
|
||||
err := insertMsgStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = insertMsgStream.Broadcast(&timeTickMsgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = ddMsgStream.Broadcast(&timeTickMsgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//messages := insertStream.Consume()
|
||||
//for _, msg := range messages.Msgs {
|
||||
//
|
||||
//}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sentTimeTick(ctx context.Context) error {
|
||||
timeTickMsgPack := msgstream.MsgPack{}
|
||||
baseMsg := msgstream.BaseMsg{
|
||||
BeginTimestamp: 1500,
|
||||
EndTimestamp: 2000,
|
||||
HashValues: []uint32{0},
|
||||
}
|
||||
timeTickResult := internalPb.TimeTickMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kTimeTick,
|
||||
MsgID: 0,
|
||||
Timestamp: math.MaxUint64,
|
||||
SourceID: 0,
|
||||
},
|
||||
}
|
||||
timeTickMsg := &msgstream.TimeTickMsg{
|
||||
BaseMsg: baseMsg,
|
||||
TimeTickMsg: timeTickResult,
|
||||
}
|
||||
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
|
||||
|
||||
// pulsar produce
|
||||
const receiveBufSize = 1024
|
||||
insertChannels := Params.InsertChannelNames
|
||||
ddChannels := Params.DDChannelNames
|
||||
pulsarURL := Params.PulsarAddress
|
||||
|
||||
insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
insertStream.SetPulsarClient(pulsarURL)
|
||||
insertStream.CreatePulsarProducers(insertChannels)
|
||||
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
||||
insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize)
|
||||
|
||||
ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
ddStream.SetPulsarClient(pulsarURL)
|
||||
ddStream.CreatePulsarProducers(ddChannels)
|
||||
|
||||
var insertMsgStream msgstream.MsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
|
||||
var ddMsgStream msgstream.MsgStream = ddStream
|
||||
ddMsgStream.Start()
|
||||
|
||||
err := insertMsgStream.Broadcast(&timeTickMsgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = ddMsgStream.Broadcast(&timeTickMsgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSegmentManager_load_release_and_search(t *testing.T) {
|
||||
collectionID := UniqueID(0)
|
||||
partitionID := UniqueID(1)
|
||||
segmentID := UniqueID(2)
|
||||
fieldIDs := []int64{0, 101}
|
||||
|
||||
// mock write insert bin log
|
||||
keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10))
|
||||
Params.WriteNodeSegKvSubPath = keyPrefix
|
||||
|
||||
node := newQueryNodeMock()
|
||||
defer node.Stop()
|
||||
|
||||
ctx := node.queryNodeLoopCtx
|
||||
node.loadIndexService = newLoadIndexService(ctx, node.replica)
|
||||
node.segManager = newSegmentManager(ctx, node.replica, node.loadIndexService.loadIndexReqChan)
|
||||
node.segManager = newSegmentManager(ctx, node.replica, nil, node.loadIndexService.loadIndexReqChan)
|
||||
go node.loadIndexService.start()
|
||||
|
||||
collectionName := "collection0"
|
||||
@ -237,7 +417,7 @@ func TestSegmentManager_load_release_and_search(t *testing.T) {
|
||||
err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed)
|
||||
assert.NoError(t, err)
|
||||
|
||||
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID)
|
||||
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs)
|
||||
@ -299,3 +479,111 @@ func TestSegmentManager_load_release_and_search(t *testing.T) {
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
//// NOTE: start pulsar before test
|
||||
//func TestSegmentManager_with_seek(t *testing.T) {
|
||||
// collectionID := UniqueID(0)
|
||||
// partitionID := UniqueID(1)
|
||||
// //segmentID := UniqueID(2)
|
||||
// fieldIDs := []int64{0, 101}
|
||||
//
|
||||
// //// mock write insert bin log
|
||||
// //keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10))
|
||||
// //Params.WriteNodeSegKvSubPath = keyPrefix + "/"
|
||||
// node := newQueryNodeMock()
|
||||
//
|
||||
// ctx := node.queryNodeLoopCtx
|
||||
// go node.Start()
|
||||
//
|
||||
// collectionName := "collection0"
|
||||
// initTestMeta(t, node, collectionName, collectionID, 0)
|
||||
//
|
||||
// err := node.replica.addPartition(collectionID, partitionID)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// //err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed)
|
||||
// //assert.NoError(t, err)
|
||||
//
|
||||
// //paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
|
||||
// //assert.NoError(t, err)
|
||||
//
|
||||
// //fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs)
|
||||
// //assert.Equal(t, len(fieldsMap), 2)
|
||||
//
|
||||
// segmentIDToInsert := UniqueID(3)
|
||||
// err = doInsert(ctx, collectionName, "default", segmentIDToInsert)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// startPositions := make([]*internalPb.MsgPosition, 0)
|
||||
// for _, ch := range Params.InsertChannelNames {
|
||||
// startPositions = append(startPositions, &internalPb.MsgPosition{
|
||||
// ChannelName: ch,
|
||||
// })
|
||||
// }
|
||||
// var positions []*internalPb.MsgPosition
|
||||
// lastSegStates := &datapb.SegmentStatesResponse{
|
||||
// State: datapb.SegmentState_SegmentGrowing,
|
||||
// StartPositions: positions,
|
||||
// }
|
||||
// loadReq := &querypb.LoadSegmentRequest{
|
||||
// CollectionID: collectionID,
|
||||
// PartitionID: partitionID,
|
||||
// SegmentIDs: []UniqueID{segmentIDToInsert},
|
||||
// FieldIDs: fieldIDs,
|
||||
// LastSegmentState: lastSegStates,
|
||||
// }
|
||||
// _, err = node.LoadSegments(loadReq)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// err = sentTimeTick(ctx)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// // do search
|
||||
// dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
|
||||
//
|
||||
// const DIM = 16
|
||||
// var searchRawData []byte
|
||||
// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
// for _, ele := range vec {
|
||||
// buf := make([]byte, 4)
|
||||
// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
|
||||
// searchRawData = append(searchRawData, buf...)
|
||||
// }
|
||||
// placeholderValue := milvuspb.PlaceholderValue{
|
||||
// Tag: "$0",
|
||||
// Type: milvuspb.PlaceholderType_VECTOR_FLOAT,
|
||||
// Values: [][]byte{searchRawData},
|
||||
// }
|
||||
//
|
||||
// placeholderGroup := milvuspb.PlaceholderGroup{
|
||||
// Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue},
|
||||
// }
|
||||
//
|
||||
// placeHolderGroupBlob, err := proto.Marshal(&placeholderGroup)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// //searchTimestamp := Timestamp(1020)
|
||||
// collection, err := node.replica.getCollectionByID(collectionID)
|
||||
// assert.NoError(t, err)
|
||||
// plan, err := createPlan(*collection, dslString)
|
||||
// assert.NoError(t, err)
|
||||
// holder, err := parserPlaceholderGroup(plan, placeHolderGroupBlob)
|
||||
// assert.NoError(t, err)
|
||||
// placeholderGroups := make([]*PlaceholderGroup, 0)
|
||||
// placeholderGroups = append(placeholderGroups, holder)
|
||||
//
|
||||
// // wait for segment building index
|
||||
// time.Sleep(3 * time.Second)
|
||||
//
|
||||
// //segment, err := node.replica.getSegmentByID(segmentIDToInsert)
|
||||
// //assert.NoError(t, err)
|
||||
// //_, err = segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp})
|
||||
// //assert.Nil(t, err)
|
||||
//
|
||||
// plan.delete()
|
||||
// holder.delete()
|
||||
//
|
||||
// <-ctx.Done()
|
||||
// err = node.Stop()
|
||||
// assert.NoError(t, err)
|
||||
//}
|
||||
|
@ -17,14 +17,6 @@ type MsgTimeTickWatcher struct {
|
||||
msgQueue chan *ms.TimeTickMsg
|
||||
}
|
||||
|
||||
func NewMsgTimeTickWatcher(streams ...ms.MsgStream) *MsgTimeTickWatcher {
|
||||
watcher := &MsgTimeTickWatcher{
|
||||
streams: streams,
|
||||
msgQueue: make(chan *ms.TimeTickMsg),
|
||||
}
|
||||
return watcher
|
||||
}
|
||||
|
||||
func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) {
|
||||
watcher.msgQueue <- msg
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user