mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Add indexbuilder client
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
07d68de2a3
commit
54f2b79f1b
@ -1,4 +1,4 @@
|
||||
package indexbuilder
|
||||
package indexbuilderclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
@ -19,13 +20,6 @@ type Client struct {
|
||||
|
||||
type IndexStatus int32
|
||||
|
||||
const (
|
||||
NOTEXIST IndexStatus = 0
|
||||
UNISSUED IndexStatus = 1
|
||||
INPROGRESS IndexStatus = 2
|
||||
FINISHED IndexStatus = 3
|
||||
)
|
||||
|
||||
type IndexDescription struct {
|
||||
ID UniqueID
|
||||
Status IndexStatus
|
||||
@ -41,8 +35,6 @@ func NewBuildIndexClient(conn *grpc.ClientConn) *Client {
|
||||
}
|
||||
|
||||
func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error) {
|
||||
//first new a build service client
|
||||
|
||||
var typeParamsKV []*commonpb.KeyValuePair
|
||||
for typeParam := range typeParams {
|
||||
typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{
|
||||
@ -75,24 +67,47 @@ func (c *Client) BuildIndexWithoutID(columnDataPaths []string, typeParams map[st
|
||||
}
|
||||
|
||||
func (c *Client) DescribeIndex(indexID UniqueID) (IndexDescription, error) {
|
||||
//ctx := context.TODO()
|
||||
//request := &indexbuilderpb.DescribleIndexRequest{
|
||||
// IndexID: indexID,
|
||||
ctx := context.TODO()
|
||||
request := &indexbuilderpb.DescribleIndexRequest{
|
||||
IndexID: indexID,
|
||||
}
|
||||
response, err := c.client.DescribeIndex(ctx, request)
|
||||
if err != nil {
|
||||
return IndexDescription{}, err
|
||||
}
|
||||
|
||||
enqueueTime, _ := tsoutil.ParseTS(response.EnqueTime)
|
||||
scheduleTime, _ := tsoutil.ParseTS(response.ScheduleTime)
|
||||
buildCompleteTime, _ := tsoutil.ParseTS(response.BuildCompleteTime)
|
||||
indexDescription := IndexDescription{
|
||||
ID: indexID,
|
||||
Status: IndexStatus(response.IndexStatus),
|
||||
EnqueueTime: enqueueTime,
|
||||
ScheduleTime: scheduleTime,
|
||||
BuildCompleteTime: buildCompleteTime,
|
||||
}
|
||||
|
||||
//indexDescription := IndexDescription{
|
||||
// ID: indexID,
|
||||
// Status: IndexStatus(response.IndexStatus),
|
||||
// EnqueueTime: time.Unix(0, response.EnqueTime),
|
||||
// ScheduleTime: time.Unix(-, response.ScheduleTime),
|
||||
// BuildCompleteTime: time.Unix(0, response.BuildCompleteTime),
|
||||
//}
|
||||
//response, err := c.client.DescribeIndex(ctx, request)
|
||||
//if err != nil {
|
||||
// return IndexDescription{}, err
|
||||
//}
|
||||
//
|
||||
//indexDescription := &IndexDescription{
|
||||
// ID: indexID,
|
||||
// Status: response.IndexStatus,
|
||||
// EnqueueTime: time.Unix(),
|
||||
//}
|
||||
return IndexDescription{}, nil
|
||||
|
||||
return indexDescription, nil
|
||||
}
|
||||
|
||||
func (c *Client) GetIndexFilePaths(IndexID UniqueID) ([]string, error) {
|
||||
func (c *Client) GetIndexFilePaths(indexID UniqueID) ([]string, error) {
|
||||
ctx := context.TODO()
|
||||
request := &indexbuilderpb.GetIndexFilePathsRequest{
|
||||
IndexID: indexID,
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
response, err := c.client.GetIndexFilePaths(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response.IndexFilePaths, nil
|
||||
}
|
||||
|
12
internal/indexbuilder/client/client_test.go
Normal file
12
internal/indexbuilder/client/client_test.go
Normal file
@ -0,0 +1,12 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package indexbuilderclient
|
@ -11,24 +11,15 @@ import (
|
||||
type BinlogReader struct {
|
||||
magicNumber int32
|
||||
descriptorEvent
|
||||
currentEventReader *EventReader
|
||||
buffer *bytes.Buffer
|
||||
bufferLength int
|
||||
currentOffset int32
|
||||
isClose bool
|
||||
buffer *bytes.Buffer
|
||||
eventList []*EventReader
|
||||
isClose bool
|
||||
}
|
||||
|
||||
func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
|
||||
if reader.isClose {
|
||||
return nil, errors.New("bin log reader is closed")
|
||||
}
|
||||
if reader.currentEventReader != nil {
|
||||
reader.currentOffset = reader.currentEventReader.NextPosition
|
||||
if err := reader.currentEventReader.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader.currentEventReader = nil
|
||||
}
|
||||
if reader.buffer.Len() <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@ -36,15 +27,14 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader.currentEventReader = eventReader
|
||||
return reader.currentEventReader, nil
|
||||
reader.eventList = append(reader.eventList, eventReader)
|
||||
return eventReader, nil
|
||||
}
|
||||
|
||||
func (reader *BinlogReader) readMagicNumber() (int32, error) {
|
||||
if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
reader.currentOffset = 4
|
||||
if reader.magicNumber != MagicNumber {
|
||||
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(int(MagicNumber)) +
|
||||
", actual: " + strconv.Itoa(int(reader.magicNumber)))
|
||||
@ -55,7 +45,6 @@ func (reader *BinlogReader) readMagicNumber() (int32, error) {
|
||||
|
||||
func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) {
|
||||
event, err := ReadDescriptorEvent(reader.buffer)
|
||||
reader.currentOffset = event.NextPosition
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -67,20 +56,20 @@ func (reader *BinlogReader) Close() error {
|
||||
if reader.isClose {
|
||||
return nil
|
||||
}
|
||||
reader.isClose = true
|
||||
if reader.currentEventReader != nil {
|
||||
if err := reader.currentEventReader.Close(); err != nil {
|
||||
for _, e := range reader.eventList {
|
||||
if err := e.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
reader.isClose = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBinlogReader(data []byte) (*BinlogReader, error) {
|
||||
reader := &BinlogReader{
|
||||
buffer: bytes.NewBuffer(data),
|
||||
bufferLength: len(data),
|
||||
isClose: false,
|
||||
buffer: bytes.NewBuffer(data),
|
||||
eventList: []*EventReader{},
|
||||
isClose: false,
|
||||
}
|
||||
|
||||
if _, err := reader.readMagicNumber(); err != nil {
|
||||
|
@ -241,4 +241,812 @@ func TestInsertBinlog(t *testing.T) {
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
//read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.Nil(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, err := event1.GetInt64FromPayload()
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, event1.TypeCode, InsertEventType)
|
||||
ed1, ok := (event1.eventData).(*insertEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, InsertEventType)
|
||||
ed2, ok := (event2.eventData).(*insertEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*deleteEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
func TestDeleteBinlog(t *testing.T) {
|
||||
w, err := NewDeleteBinlogWriter(schemapb.DataType_INT64, 50)
|
||||
assert.Nil(t, err)
|
||||
|
||||
e1, err := w.NextDeleteEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
e1.SetStartTimestamp(100)
|
||||
e1.SetEndTimestamp(200)
|
||||
|
||||
e2, err := w.NextDeleteEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9})
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true})
|
||||
assert.NotNil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12})
|
||||
assert.Nil(t, err)
|
||||
e2.SetStartTimestamp(300)
|
||||
e2.SetEndTimestamp(400)
|
||||
|
||||
w.SetStartTimeStamp(1000)
|
||||
w.SetEndTimeStamp(2000)
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//magic number
|
||||
magicNum := UnsafeReadInt32(buf, 0)
|
||||
assert.Equal(t, magicNum, MagicNumber)
|
||||
pos := int(unsafe.Sizeof(MagicNumber))
|
||||
|
||||
//descriptor header, timestamp
|
||||
ts := UnsafeReadInt64(buf, pos)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
diffts := curts - ts
|
||||
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(ts))
|
||||
|
||||
//descriptor header, type code
|
||||
tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
|
||||
//descriptor header, next position
|
||||
descNxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
pos += int(unsafe.Sizeof(collID))
|
||||
|
||||
//descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
//descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
//descriptor data fix, field id
|
||||
fieldID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
//descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
pos += int(unsafe.Sizeof(startts))
|
||||
|
||||
//descriptor data fix, end time stamp
|
||||
endts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, endts, int64(2000))
|
||||
pos += int(unsafe.Sizeof(endts))
|
||||
|
||||
//descriptor data fix, payload type
|
||||
colType := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
|
||||
pos += int(unsafe.Sizeof(colType))
|
||||
|
||||
//descriptor data, post header lengths
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, uint8(size), buf[pos])
|
||||
pos++
|
||||
}
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
//insert e1 header, Timestamp
|
||||
e1ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e1ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e1ts))
|
||||
|
||||
//insert e1 header, type code
|
||||
e1tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e1tc), DeleteEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
|
||||
//insert e1 header, next position
|
||||
e1NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e1 data, start time stamp
|
||||
e1st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1st, int64(100))
|
||||
pos += int(unsafe.Sizeof(e1st))
|
||||
|
||||
//insert e1 data, end time stamp
|
||||
e1et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1et, int64(200))
|
||||
pos += int(unsafe.Sizeof(e1et))
|
||||
|
||||
//insert e1, payload
|
||||
e1Payload := buf[pos:e1NxtPos]
|
||||
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
|
||||
assert.Nil(t, err)
|
||||
e1a, err := e1r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = e1r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//start of e2
|
||||
pos = int(e1NxtPos)
|
||||
|
||||
//insert e2 header, Timestamp
|
||||
e2ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e2ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e2ts))
|
||||
|
||||
//insert e2 header, type code
|
||||
e2tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e2tc), DeleteEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
//insert e2 header, next position
|
||||
e2NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e2 data, start time stamp
|
||||
e2st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2st, int64(300))
|
||||
pos += int(unsafe.Sizeof(e2st))
|
||||
|
||||
//insert e2 data, end time stamp
|
||||
e2et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2et, int64(400))
|
||||
pos += int(unsafe.Sizeof(e2et))
|
||||
|
||||
//insert e2, payload
|
||||
e2Payload := buf[pos:]
|
||||
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
|
||||
assert.Nil(t, err)
|
||||
e2a, err := e2r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
|
||||
err = e2r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
//read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.Nil(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, err := event1.GetInt64FromPayload()
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, event1.TypeCode, DeleteEventType)
|
||||
ed1, ok := (event1.eventData).(*deleteEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, DeleteEventType)
|
||||
ed2, ok := (event2.eventData).(*deleteEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*insertEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
func TestDDLBinlog1(t *testing.T) {
|
||||
w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50)
|
||||
assert.Nil(t, err)
|
||||
|
||||
e1, err := w.NextCreateCollectionEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
e1.SetStartTimestamp(100)
|
||||
e1.SetEndTimestamp(200)
|
||||
|
||||
e2, err := w.NextDropCollectionEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9})
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true})
|
||||
assert.NotNil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12})
|
||||
assert.Nil(t, err)
|
||||
e2.SetStartTimestamp(300)
|
||||
e2.SetEndTimestamp(400)
|
||||
|
||||
w.SetStartTimeStamp(1000)
|
||||
w.SetEndTimeStamp(2000)
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//magic number
|
||||
magicNum := UnsafeReadInt32(buf, 0)
|
||||
assert.Equal(t, magicNum, MagicNumber)
|
||||
pos := int(unsafe.Sizeof(MagicNumber))
|
||||
|
||||
//descriptor header, timestamp
|
||||
ts := UnsafeReadInt64(buf, pos)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
diffts := curts - ts
|
||||
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(ts))
|
||||
|
||||
//descriptor header, type code
|
||||
tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
|
||||
//descriptor header, next position
|
||||
descNxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
pos += int(unsafe.Sizeof(collID))
|
||||
|
||||
//descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
//descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
//descriptor data fix, field id
|
||||
fieldID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
//descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
pos += int(unsafe.Sizeof(startts))
|
||||
|
||||
//descriptor data fix, end time stamp
|
||||
endts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, endts, int64(2000))
|
||||
pos += int(unsafe.Sizeof(endts))
|
||||
|
||||
//descriptor data fix, payload type
|
||||
colType := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
|
||||
pos += int(unsafe.Sizeof(colType))
|
||||
|
||||
//descriptor data, post header lengths
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, uint8(size), buf[pos])
|
||||
pos++
|
||||
}
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
//insert e1 header, Timestamp
|
||||
e1ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e1ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e1ts))
|
||||
|
||||
//insert e1 header, type code
|
||||
e1tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
|
||||
//insert e1 header, next position
|
||||
e1NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e1 data, start time stamp
|
||||
e1st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1st, int64(100))
|
||||
pos += int(unsafe.Sizeof(e1st))
|
||||
|
||||
//insert e1 data, end time stamp
|
||||
e1et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1et, int64(200))
|
||||
pos += int(unsafe.Sizeof(e1et))
|
||||
|
||||
//insert e1, payload
|
||||
e1Payload := buf[pos:e1NxtPos]
|
||||
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
|
||||
assert.Nil(t, err)
|
||||
e1a, err := e1r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = e1r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//start of e2
|
||||
pos = int(e1NxtPos)
|
||||
|
||||
//insert e2 header, Timestamp
|
||||
e2ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e2ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e2ts))
|
||||
|
||||
//insert e2 header, type code
|
||||
e2tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
//insert e2 header, next position
|
||||
e2NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e2 data, start time stamp
|
||||
e2st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2st, int64(300))
|
||||
pos += int(unsafe.Sizeof(e2st))
|
||||
|
||||
//insert e2 data, end time stamp
|
||||
e2et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2et, int64(400))
|
||||
pos += int(unsafe.Sizeof(e2et))
|
||||
|
||||
//insert e2, payload
|
||||
e2Payload := buf[pos:]
|
||||
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
|
||||
assert.Nil(t, err)
|
||||
e2a, err := e2r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
|
||||
err = e2r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
//read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.Nil(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, err := event1.GetInt64FromPayload()
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, event1.TypeCode, CreateCollectionEventType)
|
||||
ed1, ok := (event1.eventData).(*createCollectionEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, DropCollectionEventType)
|
||||
ed2, ok := (event2.eventData).(*dropCollectionEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*insertEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
func TestDDLBinlog2(t *testing.T) {
|
||||
w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50)
|
||||
assert.Nil(t, err)
|
||||
|
||||
e1, err := w.NextCreatePartitionEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
e1.SetStartTimestamp(100)
|
||||
e1.SetEndTimestamp(200)
|
||||
|
||||
e2, err := w.NextDropPartitionEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9})
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true})
|
||||
assert.NotNil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12})
|
||||
assert.Nil(t, err)
|
||||
e2.SetStartTimestamp(300)
|
||||
e2.SetEndTimestamp(400)
|
||||
|
||||
w.SetStartTimeStamp(1000)
|
||||
w.SetEndTimeStamp(2000)
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//magic number
|
||||
magicNum := UnsafeReadInt32(buf, 0)
|
||||
assert.Equal(t, magicNum, MagicNumber)
|
||||
pos := int(unsafe.Sizeof(MagicNumber))
|
||||
|
||||
//descriptor header, timestamp
|
||||
ts := UnsafeReadInt64(buf, pos)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
diffts := curts - ts
|
||||
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(ts))
|
||||
|
||||
//descriptor header, type code
|
||||
tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
|
||||
//descriptor header, next position
|
||||
descNxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
pos += int(unsafe.Sizeof(collID))
|
||||
|
||||
//descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
//descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
//descriptor data fix, field id
|
||||
fieldID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
//descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
pos += int(unsafe.Sizeof(startts))
|
||||
|
||||
//descriptor data fix, end time stamp
|
||||
endts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, endts, int64(2000))
|
||||
pos += int(unsafe.Sizeof(endts))
|
||||
|
||||
//descriptor data fix, payload type
|
||||
colType := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64)
|
||||
pos += int(unsafe.Sizeof(colType))
|
||||
|
||||
//descriptor data, post header lengths
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, uint8(size), buf[pos])
|
||||
pos++
|
||||
}
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
//insert e1 header, Timestamp
|
||||
e1ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e1ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e1ts))
|
||||
|
||||
//insert e1 header, type code
|
||||
e1tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
|
||||
//insert e1 header, next position
|
||||
e1NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e1 data, start time stamp
|
||||
e1st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1st, int64(100))
|
||||
pos += int(unsafe.Sizeof(e1st))
|
||||
|
||||
//insert e1 data, end time stamp
|
||||
e1et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1et, int64(200))
|
||||
pos += int(unsafe.Sizeof(e1et))
|
||||
|
||||
//insert e1, payload
|
||||
e1Payload := buf[pos:e1NxtPos]
|
||||
e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload)
|
||||
assert.Nil(t, err)
|
||||
e1a, err := e1r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
|
||||
err = e1r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
//start of e2
|
||||
pos = int(e1NxtPos)
|
||||
|
||||
//insert e2 header, Timestamp
|
||||
e2ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e2ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e2ts))
|
||||
|
||||
//insert e2 header, type code
|
||||
e2tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
//insert e2 header, next position
|
||||
e2NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//insert e2 data, start time stamp
|
||||
e2st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2st, int64(300))
|
||||
pos += int(unsafe.Sizeof(e2st))
|
||||
|
||||
//insert e2 data, end time stamp
|
||||
e2et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2et, int64(400))
|
||||
pos += int(unsafe.Sizeof(e2et))
|
||||
|
||||
//insert e2, payload
|
||||
e2Payload := buf[pos:]
|
||||
e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload)
|
||||
assert.Nil(t, err)
|
||||
e2a, err := e2r.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
|
||||
err = e2r.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
//read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.Nil(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, err := event1.GetInt64FromPayload()
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, event1.TypeCode, CreatePartitionEventType)
|
||||
ed1, ok := (event1.eventData).(*createPartitionEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, DropPartitionEventType)
|
||||
ed2, ok := (event2.eventData).(*dropPartitionEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*insertEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
@ -223,12 +223,13 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) {
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DeleteBinlogWriter, error) {
|
||||
descriptorEvent, err := newDescriptorEvent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
descriptorEvent.CollectionID = collectionID
|
||||
return &DeleteBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: *descriptorEvent,
|
||||
@ -239,12 +240,13 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, err
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) {
|
||||
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBinlogWriter, error) {
|
||||
descriptorEvent, err := newDescriptorEvent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
descriptorEvent.CollectionID = collectionID
|
||||
return &DDLBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: *descriptorEvent,
|
||||
|
@ -54,6 +54,5 @@ func TestBinlogWriterReader(t *testing.T) {
|
||||
|
||||
reader, err := binlogReader.NextEventReader()
|
||||
assert.Nil(t, err)
|
||||
fmt.Println("reader offset : " + strconv.Itoa(int(binlogReader.currentOffset)))
|
||||
assert.Nil(t, reader)
|
||||
}
|
||||
|
@ -359,7 +359,7 @@ type DataDefinitionCodec struct {
|
||||
}
|
||||
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
|
||||
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING)
|
||||
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -431,7 +431,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
|
||||
value: buffer,
|
||||
})
|
||||
|
||||
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64)
|
||||
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user