diff --git a/internal/datanode/client/client.go b/internal/datanode/client/client.go deleted file mode 100644 index f987f20d64..0000000000 --- a/internal/datanode/client/client.go +++ /dev/null @@ -1,137 +0,0 @@ -package writerclient - -import ( - "strconv" - - "github.com/golang/protobuf/proto" - "go.etcd.io/etcd/clientv3" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -type UniqueID = typeutil.UniqueID - -type Timestamp = typeutil.Timestamp - -type Client struct { - kvClient kv.TxnBase // client of a reliable kv service, i.e. etcd client - kvPrefix string - - flushStream msgstream.MsgStream -} - -func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeSegKvSubPath string, flushStream msgstream.MsgStream) (*Client, error) { - // init kv client - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) - if err != nil { - return nil, err - } - kvClient := etcdkv.NewEtcdKV(etcdClient, kvRootPath) - - return &Client{ - kvClient: kvClient, - kvPrefix: writeNodeSegKvSubPath, - flushStream: flushStream, - }, nil -} - -type SegmentDescription struct { - SegmentID UniqueID - IsClosed bool - OpenTime Timestamp - CloseTime Timestamp -} - -func (c *Client) FlushSegment(segmentID UniqueID, collectionID UniqueID, partitionTag string, timestamp Timestamp) error { - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - } - - flushMsg := internalpb2.FlushMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kFlush, - Timestamp: timestamp, - }, - SegmentID: segmentID, - CollectionID: collectionID, - PartitionTag: partitionTag, - } - - fMsg := &msgstream.FlushMsg{ - BaseMsg: baseMsg, - FlushMsg: flushMsg, - } - msgPack := msgstream.MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, fMsg) - - err := c.flushStream.Produce(&msgPack) - return err -} - -func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error) { - // query etcd - ret := &SegmentDescription{ - SegmentID: segmentID, - IsClosed: false, - } - - key := c.kvPrefix + strconv.FormatInt(segmentID, 10) - - etcdKV, ok := c.kvClient.(*etcdkv.EtcdKV) - if !ok { - return nil, errors.New("type assertion failed for etcd kv") - } - count, err := etcdKV.GetCount(key) - if err != nil { - return nil, err - } - - if count <= 0 { - ret.IsClosed = false - return ret, nil - } - - value, err := c.kvClient.Load(key) - if err != nil { - return ret, err - } - - flushMeta := pb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, &flushMeta) - if err != nil { - return ret, err - } - ret.IsClosed = flushMeta.IsClosed - ret.OpenTime = flushMeta.OpenTime - ret.CloseTime = flushMeta.CloseTime - return ret, nil -} - -func (c *Client) GetInsertBinlogPaths(segmentID UniqueID) (map[int64][]string, error) { - key := c.kvPrefix + strconv.FormatInt(segmentID, 10) - - value, err := c.kvClient.Load(key) - if err != nil { - return nil, err - } - - flushMeta := pb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, &flushMeta) - if err != nil { - return nil, err - } - ret := make(map[int64][]string) - for _, field := range flushMeta.Fields { - ret[field.FieldID] = field.BinlogPaths - } - return ret, nil -} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ca55446b5d..0ae0497e47 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -2,6 +2,7 @@ package datanode import ( "context" + "fmt" "io" "log" "time" @@ -82,9 +83,9 @@ func NewDataNode(ctx context.Context) *DataNode { node := &DataNode{ ctx: ctx2, cancel: cancel2, - NodeID: Params.NodeID, // GOOSE TODO How to init + NodeID: Params.NodeID, // GOOSE TODO: How to init Role: typeutil.DataNodeRole, - State: internalpb2.StateCode_INITIALIZING, + State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic dataSyncService: nil, metaService: nil, masterService: nil, @@ -96,15 +97,26 @@ func NewDataNode(ctx context.Context) *DataNode { } func (node *DataNode) SetMasterServiceInterface(ms MasterServiceInterface) error { - node.masterService = ms - return nil + switch { + case ms == nil, node.masterService != nil: + return errors.New("Nil parameter or repeatly set") + default: + node.masterService = ms + return nil + } } func (node *DataNode) SetDataServiceInterface(ds DataServiceInterface) error { - node.dataService = ds - return nil + switch { + case ds == nil, node.dataService != nil: + return errors.New("Nil parameter or repeatly set") + default: + node.dataService = ds + return nil + } } +// Suppose dataservice is in INITIALIZING func (node *DataNode) Init() error { req := &datapb.RegisterNodeRequest{ @@ -145,11 +157,15 @@ func (node *DataNode) Init() error { } var alloc allocator = newAllocatorImpl(node.masterService) + chanSize := 100 node.flushChan = make(chan *flushMsg, chanSize) + node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc) node.metaService = newMetaService(node.ctx, replica, node.masterService) + node.replica = replica + node.dataSyncService.initNodes() // --- Opentracing --- cfg := &config.Configuration{ @@ -174,19 +190,38 @@ func (node *DataNode) Init() error { } func (node *DataNode) Start() error { - - go node.dataSyncService.start() node.metaService.init() - node.State = internalpb2.StateCode_HEALTHY - return nil } -func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { - log.Println("Init insert channel names:", in.GetChannelNames()) - Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...) +// DataNode is HEALTHY until StartSync() is called +func (node *DataNode) StartSync() { + node.dataSyncService.init() + go node.dataSyncService.start() + node.State = internalpb2.StateCode_HEALTHY +} - return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil +func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + } + + switch { + + case node.State != internalpb2.StateCode_HEALTHY: + status.Reason = fmt.Sprintf("DataNode %d not healthy!", node.NodeID) + return status, errors.New(status.GetReason()) + + case len(Params.InsertChannelNames) != 0: + status.Reason = fmt.Sprintf("DataNode has %d already set insert channels!", node.NodeID) + return status, errors.New(status.GetReason()) + + default: + Params.InsertChannelNames = in.GetChannelNames() + status.ErrorCode = commonpb.ErrorCode_SUCCESS + node.StartSync() + return status, nil + } } func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 7de7101c19..f0626a8571 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -1,7 +1,10 @@ package datanode import ( + "bytes" + "encoding/binary" "log" + "math" "math/rand" "os" "strconv" @@ -10,6 +13,14 @@ import ( "go.etcd.io/etcd/clientv3" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) func makeNewChannelNames(names []string, suffix string) []string { @@ -77,3 +88,366 @@ func clearEtcd(rootPath string) error { return nil } + +type ( + Factory interface { + } + + MetaFactory struct { + } + + DataFactory struct { + rawData []byte + } + + AllocatorFactory struct { + ID UniqueID + } + + MasterServiceFactory struct { + ID UniqueID + collectionName string + collectionID UniqueID + } +) + +func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { + sch := schemapb.CollectionSchema{ + Name: collectionName, + Description: "test collection by meta factory", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "RowID", + Description: "RowID field", + DataType: schemapb.DataType_INT64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "f0_tk1", + Value: "f0_tv1", + }, + }, + }, + { + FieldID: 1, + Name: "Timestamp", + Description: "Timestamp field", + DataType: schemapb.DataType_INT64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "f1_tk1", + Value: "f1_tv1", + }, + }, + }, + { + FieldID: 100, + Name: "float_vector_field", + Description: "field 100", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "2", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "indexkey", + Value: "indexvalue", + }, + }, + }, + { + FieldID: 101, + Name: "binary_vector_field", + Description: "field 101", + DataType: schemapb.DataType_VECTOR_BINARY, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "32", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "indexkey", + Value: "indexvalue", + }, + }, + }, + { + FieldID: 102, + Name: "bool_field", + Description: "field 102", + DataType: schemapb.DataType_BOOL, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 103, + Name: "int8_field", + Description: "field 103", + DataType: schemapb.DataType_INT8, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 104, + Name: "int16_field", + Description: "field 104", + DataType: schemapb.DataType_INT16, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 105, + Name: "int32_field", + Description: "field 105", + DataType: schemapb.DataType_INT32, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 106, + Name: "int64_field", + Description: "field 106", + DataType: schemapb.DataType_INT64, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 107, + Name: "float32_field", + Description: "field 107", + DataType: schemapb.DataType_FLOAT, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + { + FieldID: 108, + Name: "float64_field", + Description: "field 108", + DataType: schemapb.DataType_DOUBLE, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + }, + } + + collection := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &sch, + CreateTime: Timestamp(1), + SegmentIDs: make([]UniqueID, 0), + PartitionTags: make([]string, 0), + } + return &collection +} + +func NewDataFactory() *DataFactory { + return &DataFactory{rawData: GenRowData()} +} + +func GenRowData() (rawData []byte) { + const DIM = 2 + const N = 1 + + // Float vector + var fvector = [DIM]float32{1, 2} + for _, ele := range fvector { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + + // Binary vector + // Dimension of binary vector is 32 + // size := 4, = 32 / 8 + var bvector = []byte{255, 255, 255, 0} + rawData = append(rawData, bvector...) + + // Bool + var fieldBool = true + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil { + panic(err) + } + + rawData = append(rawData, buf.Bytes()...) + + // int8 + var dataInt8 int8 = 100 + bint8 := new(bytes.Buffer) + if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil { + panic(err) + } + rawData = append(rawData, bint8.Bytes()...) + + // int16 + var dataInt16 int16 = 200 + bint16 := new(bytes.Buffer) + if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil { + panic(err) + } + rawData = append(rawData, bint16.Bytes()...) + + // int32 + var dataInt32 int32 = 300 + bint32 := new(bytes.Buffer) + if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil { + panic(err) + } + rawData = append(rawData, bint32.Bytes()...) + + // int64 + var dataInt64 int64 = 400 + bint64 := new(bytes.Buffer) + if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil { + panic(err) + } + rawData = append(rawData, bint64.Bytes()...) + + // float32 + var datafloat float32 = 1.1 + bfloat32 := new(bytes.Buffer) + if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil { + panic(err) + } + rawData = append(rawData, bfloat32.Bytes()...) + + // float64 + var datafloat64 float64 = 2.2 + bfloat64 := new(bytes.Buffer) + if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil { + panic(err) + } + rawData = append(rawData, bfloat64.Bytes()...) + log.Println("Rawdata length:", len(rawData)) + return +} + +// n: number of TsinsertMsgs to generate +func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int) (inMsgs []msgstream.TsMsg) { + for i := 0; i < n; i++ { + var msg msgstream.TsMsg = &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(i)}, + }, + InsertRequest: internalpb2.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kInsert, + MsgID: 0, // GOOSE TODO + Timestamp: Timestamp(i + 1000), + SourceID: 0, + }, + CollectionName: "col1", // GOOSE TODO + PartitionName: "default", + SegmentID: 1, // GOOSE TODO + ChannelID: "0", // GOOSE TODO + Timestamps: []Timestamp{Timestamp(i + 1000)}, + RowIDs: []UniqueID{UniqueID(i)}, + RowData: []*commonpb.Blob{{Value: df.rawData}}, + }, + } + inMsgs = append(inMsgs, msg) + } + return +} + +// n: number of insertMsgs to generate +func (df *DataFactory) GetMsgStreamInsertMsgs(n int) (inMsgs []*msgstream.InsertMsg) { + for i := 0; i < n; i++ { + var msg = &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(i)}, + }, + InsertRequest: internalpb2.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kInsert, + MsgID: 0, // GOOSE TODO + Timestamp: Timestamp(i + 1000), + SourceID: 0, + }, + CollectionName: "col1", // GOOSE TODO + PartitionName: "default", + SegmentID: 1, // GOOSE TODO + ChannelID: "0", // GOOSE TODO + Timestamps: []Timestamp{Timestamp(i + 1000)}, + RowIDs: []UniqueID{UniqueID(i)}, + RowData: []*commonpb.Blob{{Value: df.rawData}}, + }, + } + inMsgs = append(inMsgs, msg) + } + return +} + +func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { + f := &AllocatorFactory{} + if len(id) == 1 { + f.ID = id[0] + } + return f +} + +func (alloc AllocatorFactory) setID(id UniqueID) { + alloc.ID = id +} + +func (alloc AllocatorFactory) allocID() (UniqueID, error) { + if alloc.ID == 0 { + return UniqueID(0), nil // GOOSE TODO: random ID generating + } + return alloc.ID, nil +} + +func (m *MasterServiceFactory) setID(id UniqueID) { + m.ID = id // GOOSE TODO: random ID generator +} + +func (m *MasterServiceFactory) setCollectionID(id UniqueID) { + m.collectionID = id +} + +func (m *MasterServiceFactory) setCollectionName(name string) { + m.collectionName = name +} + +func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { + resp := &masterpb.IDResponse{ + Status: &commonpb.Status{}, + ID: m.ID, + } + return resp, nil +} + +func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { + resp := &milvuspb.ShowCollectionResponse{ + Status: &commonpb.Status{}, + CollectionNames: []string{m.collectionName}, + } + return resp, nil + +} +func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + f := MetaFactory{} + meta := f.CollectionMetaFactory(m.collectionID, m.collectionName) + resp := &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{}, + CollectionID: m.collectionID, + Schema: meta.Schema, + } + return resp, nil +} + +func (m *MasterServiceFactory) GetComponentStates() (*internalpb2.ComponentStates, error) { + return &internalpb2.ComponentStates{ + State: &internalpb2.ComponentInfo{}, + SubcomponentStates: make([]*internalpb2.ComponentInfo, 0), + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil +} diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index fa8b09ec0a..4a49b79af0 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -19,18 +19,27 @@ type dataSyncService struct { func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, replica collectionReplica, alloc allocator) *dataSyncService { - - return &dataSyncService{ + service := &dataSyncService{ ctx: ctx, fg: nil, flushChan: flushChan, replica: replica, idAllocator: alloc, } + return service +} + +func (dsService *dataSyncService) init() { + if len(Params.InsertChannelNames) == 0 { + log.Println("InsertChannels not readly, init datasync service failed") + return + } + + dsService.initNodes() } func (dsService *dataSyncService) start() { - dsService.initNodes() + log.Println("Data Sync Service Start Successfully") dsService.fg.Start() } @@ -60,7 +69,6 @@ func (dsService *dataSyncService) initNodes() { var ddStreamNode Node = newDDInputNode(dsService.ctx) var filterDmNode Node = newFilteredDmNode() - var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator) var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator) var gcNode Node = newGCNode(dsService.replica) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 3b40ea1a20..1a10794989 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -2,7 +2,6 @@ package datanode import ( "context" - "encoding/binary" "math" "testing" "time" @@ -42,116 +41,15 @@ func TestDataSyncService_Start(t *testing.T) { allocFactory := AllocatorFactory{} sync := newDataSyncService(ctx, flushChan, replica, allocFactory) sync.replica.addCollection(collMeta.ID, collMeta.Schema) + sync.init() go sync.start() - // test data generate - // GOOSE TODO orgnize - const DIM = 2 - const N = 1 - var rawData []byte - - // Float vector - var fvector = [DIM]float32{1, 2} - for _, ele := range fvector { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - - // Binary vector - // Dimension of binary vector is 32 - var bvector = [4]byte{255, 255, 255, 0} - for _, ele := range bvector { - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, uint32(ele)) - rawData = append(rawData, bs...) - } - - // Bool - bb := make([]byte, 4) - var fieldBool = true - var fieldBoolInt uint32 - if fieldBool { - fieldBoolInt = 1 - } else { - fieldBoolInt = 0 - } - - binary.LittleEndian.PutUint32(bb, fieldBoolInt) - rawData = append(rawData, bb...) - - // int8 - var dataInt8 int8 = 100 - bint8 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint8, uint32(dataInt8)) - rawData = append(rawData, bint8...) - - // int16 - var dataInt16 int16 = 200 - bint16 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint16, uint32(dataInt16)) - rawData = append(rawData, bint16...) - - // int32 - var dataInt32 int32 = 300 - bint32 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint32, uint32(dataInt32)) - rawData = append(rawData, bint32...) - - // int64 - var dataInt64 int64 = 300 - bint64 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint64, uint32(dataInt64)) - rawData = append(rawData, bint64...) - - // float32 - var datafloat float32 = 1.1 - bfloat32 := make([]byte, 4) - binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat)) - rawData = append(rawData, bfloat32...) - - // float64 - var datafloat64 float64 = 2.2 - bfloat64 := make([]byte, 8) - binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64)) - rawData = append(rawData, bfloat64...) - timeRange := TimeRange{ timestampMin: 0, timestampMax: math.MaxUint64, } - - // messages generate - const MSGLENGTH = 1 - insertMessages := make([]msgstream.TsMsg, 0) - for i := 0; i < MSGLENGTH; i++ { - var msg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{ - uint32(i), - }, - }, - InsertRequest: internalpb2.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kInsert, - MsgID: UniqueID(0), - Timestamp: Timestamp(i + 1000), - SourceID: 0, - }, - CollectionName: "col1", - PartitionName: "default", - SegmentID: UniqueID(1), - ChannelID: "0", - Timestamps: []Timestamp{Timestamp(i + 1000)}, - RowIDs: []UniqueID{UniqueID(i)}, - - RowData: []*commonpb.Blob{ - {Value: rawData}, - }, - }, - } - insertMessages = append(insertMessages, msg) - } + dataFactory := NewDataFactory() + insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2) msgPack := msgstream.MsgPack{ BeginTs: timeRange.timestampMin, @@ -208,6 +106,7 @@ func TestDataSyncService_Start(t *testing.T) { // dataSync Params.FlushInsertBufferSize = 1 + <-sync.ctx.Done() sync.close() } diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go deleted file mode 100644 index c0fbebb346..0000000000 --- a/internal/datanode/factory.go +++ /dev/null @@ -1,231 +0,0 @@ -package datanode - -import ( - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -type ( - Factory interface { - } - - MetaFactory struct { - } - - AllocatorFactory struct { - ID UniqueID - } - - MasterServiceFactory struct { - ID UniqueID - collectionName string - collectionID UniqueID - } -) - -func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { - sch := schemapb.CollectionSchema{ - Name: collectionName, - Description: "test collection by meta factory", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - FieldID: 0, - Name: "RowID", - Description: "RowID field", - DataType: schemapb.DataType_INT64, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "f0_tk1", - Value: "f0_tv1", - }, - }, - }, - { - FieldID: 1, - Name: "Timestamp", - Description: "Timestamp field", - DataType: schemapb.DataType_INT64, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "f1_tk1", - Value: "f1_tv1", - }, - }, - }, - { - FieldID: 100, - Name: "float_vector_field", - Description: "field 100", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "indexkey", - Value: "indexvalue", - }, - }, - }, - { - FieldID: 101, - Name: "binary_vector_field", - Description: "field 101", - DataType: schemapb.DataType_VECTOR_BINARY, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "32", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "indexkey", - Value: "indexvalue", - }, - }, - }, - { - FieldID: 102, - Name: "bool_field", - Description: "field 102", - DataType: schemapb.DataType_BOOL, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 103, - Name: "int8_field", - Description: "field 103", - DataType: schemapb.DataType_INT8, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 104, - Name: "int16_field", - Description: "field 104", - DataType: schemapb.DataType_INT16, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 105, - Name: "int32_field", - Description: "field 105", - DataType: schemapb.DataType_INT32, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 106, - Name: "int64_field", - Description: "field 106", - DataType: schemapb.DataType_INT64, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 107, - Name: "float32_field", - Description: "field 107", - DataType: schemapb.DataType_FLOAT, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - { - FieldID: 108, - Name: "float64_field", - Description: "field 108", - DataType: schemapb.DataType_DOUBLE, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, - }, - }, - } - - collection := etcdpb.CollectionMeta{ - ID: collectionID, - Schema: &sch, - CreateTime: Timestamp(1), - SegmentIDs: make([]UniqueID, 0), - PartitionTags: make([]string, 0), - } - return &collection -} - -func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { - f := &AllocatorFactory{} - if len(id) == 1 { - f.ID = id[0] - } - return f -} - -func (alloc AllocatorFactory) setID(id UniqueID) { - alloc.ID = id -} - -func (alloc AllocatorFactory) allocID() (UniqueID, error) { - if alloc.ID == 0 { - return UniqueID(0), nil // GOOSE TODO: random ID generating - } - return alloc.ID, nil -} - -func (m *MasterServiceFactory) setID(id UniqueID) { - m.ID = id // GOOSE TODO: random ID generator -} - -func (m *MasterServiceFactory) setCollectionID(id UniqueID) { - m.collectionID = id -} - -func (m *MasterServiceFactory) setCollectionName(name string) { - m.collectionName = name -} - -func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - resp := &masterpb.IDResponse{ - Status: &commonpb.Status{}, - ID: m.ID, - } - return resp, nil -} - -func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - resp := &milvuspb.ShowCollectionResponse{ - Status: &commonpb.Status{}, - CollectionNames: []string{m.collectionName}, - } - return resp, nil - -} -func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - f := MetaFactory{} - meta := f.CollectionMetaFactory(m.collectionID, m.collectionName) - resp := &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{}, - CollectionID: m.collectionID, - Schema: meta.Schema, - } - return resp, nil -} - -func (m *MasterServiceFactory) GetComponentStates() (*internalpb2.ComponentStates, error) { - return &internalpb2.ComponentStates{ - State: &internalpb2.ComponentInfo{}, - SubcomponentStates: make([]*internalpb2.ComponentInfo, 0), - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - }, nil -} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 00e80f28be..30b97ab138 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -132,7 +132,6 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { } default: - //log.Println(". default: do nothing ...") } // generate binlog diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 28295a2df3..12c776b981 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -1,10 +1,7 @@ package datanode import ( - "bytes" "context" - "encoding/binary" - "log" "math" "testing" "time" @@ -12,8 +9,6 @@ import ( "github.com/stretchr/testify/require" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -43,7 +38,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { err = replica.addCollection(collMeta.ID, collMeta.Schema) require.NoError(t, err) - // Params.FlushInsertBufSize = 2 idFactory := AllocatorFactory{} iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory) inMsg := genInsertMsg() @@ -52,82 +46,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { } func genInsertMsg() insertMsg { - // test data generate - const DIM = 2 - const N = 1 - var rawData []byte - - // Float vector - var fvector = [DIM]float32{1, 2} - for _, ele := range fvector { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - - // Binary vector - // Dimension of binary vector is 32 - // size := 4, = 32 / 8 - var bvector = []byte{255, 255, 255, 0} - rawData = append(rawData, bvector...) - - // Bool - var fieldBool = true - buf := new(bytes.Buffer) - if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil { - panic(err) - } - - rawData = append(rawData, buf.Bytes()...) - - // int8 - var dataInt8 int8 = 100 - bint8 := new(bytes.Buffer) - if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil { - panic(err) - } - rawData = append(rawData, bint8.Bytes()...) - - // int16 - var dataInt16 int16 = 200 - bint16 := new(bytes.Buffer) - if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil { - panic(err) - } - rawData = append(rawData, bint16.Bytes()...) - - // int32 - var dataInt32 int32 = 300 - bint32 := new(bytes.Buffer) - if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil { - panic(err) - } - rawData = append(rawData, bint32.Bytes()...) - - // int64 - var dataInt64 int64 = 400 - bint64 := new(bytes.Buffer) - if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil { - panic(err) - } - rawData = append(rawData, bint64.Bytes()...) - - // float32 - var datafloat float32 = 1.1 - bfloat32 := new(bytes.Buffer) - if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil { - panic(err) - } - rawData = append(rawData, bfloat32.Bytes()...) - - // float64 - var datafloat64 float64 = 2.2 - bfloat64 := new(bytes.Buffer) - if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil { - panic(err) - } - rawData = append(rawData, bfloat64.Bytes()...) - log.Println("Test rawdata length:", len(rawData)) timeRange := TimeRange{ timestampMin: 0, @@ -143,55 +61,8 @@ func genInsertMsg() insertMsg { }, } - // messages generate - const MSGLENGTH = 1 - // insertMessages := make([]msgstream.TsMsg, 0) - for i := 0; i < MSGLENGTH; i++ { - var msg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{ - uint32(i), - }, - }, - InsertRequest: internalpb2.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kInsert, - MsgID: 0, - Timestamp: Timestamp(i + 1000), - SourceID: 0, - }, - CollectionName: "col1", - PartitionName: "default", - CollectionID: 0, - PartitionID: 1, - SegmentID: UniqueID(1), - ChannelID: "0", - Timestamps: []Timestamp{ - Timestamp(i + 1000), - Timestamp(i + 1000), - Timestamp(i + 1000), - Timestamp(i + 1000), - Timestamp(i + 1000), - }, - RowIDs: []UniqueID{ - UniqueID(i), - UniqueID(i), - UniqueID(i), - UniqueID(i), - UniqueID(i), - }, - - RowData: []*commonpb.Blob{ - {Value: rawData}, - {Value: rawData}, - {Value: rawData}, - {Value: rawData}, - {Value: rawData}, - }, - }, - } - iMsg.insertMessages = append(iMsg.insertMessages, msg) - } + dataFactory := NewDataFactory() + iMsg.insertMessages = append(iMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(2)...) fmsg := &flushMsg{ msgID: 1, diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go index 163ac7fc6c..57a2ff1646 100644 --- a/internal/datanode/flow_graph_msg_stream_input_node.go +++ b/internal/datanode/flow_graph_msg_stream_input_node.go @@ -10,42 +10,35 @@ import ( ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - msgStreamURL := Params.PulsarAddress + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism consumeChannels := Params.InsertChannelNames consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := util.NewUnmarshalDispatcher() insertStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024) - - insertStream.SetPulsarClient(msgStreamURL) - unmarshalDispatcher := util.NewUnmarshalDispatcher() + insertStream.SetPulsarClient(Params.PulsarAddress) insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024) var stream msgstream.MsgStream = insertStream - - maxQueueLength := Params.FlowGraphMaxQueueLength - maxParallelism := Params.FlowGraphMaxParallelism - node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) return node } func newDDInputNode(ctx context.Context) *flowgraph.InputNode { - consumeChannels := Params.DDChannelNames - consumeSubName := Params.MsgChannelSubName - - ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024) - ddStream.SetPulsarClient(Params.PulsarAddress) - unmarshalDispatcher := util.NewUnmarshalDispatcher() - ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024) - - var stream msgstream.MsgStream = ddStream - maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := util.NewUnmarshalDispatcher() + tmpStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024) + tmpStream.SetPulsarClient(Params.PulsarAddress) + tmpStream.CreatePulsarConsumers(Params.DDChannelNames, consumeSubName, unmarshalDispatcher, 1024) + + var stream msgstream.MsgStream = tmpStream node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism) return node } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index b133267f9a..46b9a47215 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -657,10 +657,18 @@ func TestMasterService(t *testing.T) { rsp, err := core.DescribeIndex(req) assert.Nil(t, err) assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.Equal(t, len(rsp.IndexDescriptions), 3) - assert.Equal(t, rsp.IndexDescriptions[0].IndexName, Params.DefaultIndexName) - assert.Equal(t, rsp.IndexDescriptions[1].IndexName, "index_field_100_0") - assert.Equal(t, rsp.IndexDescriptions[2].IndexName, "index_field_100_1") + indexNames := make([]string, 0) + for _, d := range rsp.IndexDescriptions { + indexNames = append(indexNames, d.IndexName) + } + + assert.ElementsMatch(t, indexNames, []string{ + "index_field_100_0", + "index_field_100_1", + Params.DefaultIndexName, + }) }) t.Run("drop partition", func(t *testing.T) {