diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index a683e4a28f..e1886b77aa 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -36,9 +36,6 @@ func main() { psc.Params.Init() log.Printf("proxy service address : %s", psc.Params.ServiceAddress) proxyService := psc.NewClient(psc.Params.ServiceAddress) - if err = proxyService.Init(); err != nil { - panic(err) - } for cnt = 0; cnt < reTryCnt; cnt++ { pxStates, err := proxyService.GetComponentStates() diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 48827b965b..83400a8ea2 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl { func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kRequestID, + MsgType: commonpb.MsgType_kShowCollections, MsgID: 1, // GOOSE TODO Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 7de7101c19..1cbdf38d3c 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -1,15 +1,20 @@ package datanode import ( + "context" + "fmt" "log" "math/rand" "os" "strconv" "testing" + "time" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/master" ) func makeNewChannelNames(names []string, suffix string) []string { @@ -31,10 +36,52 @@ func refreshChannelNames() { Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) } +func startMaster(ctx context.Context) { + master.Init() + etcdAddr := master.Params.EtcdAddress + metaRootPath := master.Params.MetaRootPath + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + if err != nil { + panic(err) + } + _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) + if err != nil { + panic(err) + } + + masterPort := 53101 + master.Params.Port = masterPort + svr, err := master.CreateServer(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + if err := svr.Run(int64(master.Params.Port)); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + fmt.Println("Waiting for server!", svr.IsServing()) + Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort) +} + func TestMain(m *testing.M) { Params.Init() refreshChannelNames() + const ctxTimeInMillisecond = 2000 + const closeWithDeadline = true + var ctx context.Context + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + startMaster(ctx) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go index 6a9128ff86..898a6aa64a 100644 --- a/internal/datanode/meta_table.go +++ b/internal/datanode/meta_table.go @@ -12,9 +12,9 @@ import ( ) type metaTable struct { - client kv.Base // + client kv.TxnBase // segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta - collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta + collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush lock sync.RWMutex } @@ -36,6 +36,24 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { return mt, nil } +func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { + mt.lock.Lock() + defer mt.lock.Unlock() + + _, ok := mt.collID2DdlMeta[collID] + if !ok { + mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ + CollectionID: collID, + BinlogPaths: make([]string, 0), + } + } + + meta := mt.collID2DdlMeta[collID] + meta.BinlogPaths = append(meta.BinlogPaths, paths...) + + return mt.saveDDLFlushMeta(meta) +} + func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error { _, ok := mt.segID2FlushMeta[segmentID] if !ok { @@ -79,6 +97,44 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error { return mt.saveSegFlushMeta(meta) } +// metaTable.lock.Lock() before call this function +func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { + value := proto.MarshalTextString(meta) + + mt.collID2DdlMeta[meta.CollectionID] = meta + prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) + + return mt.client.Save(prefix, value) +} + +func (mt *metaTable) reloadDdlMetaFromKV() error { + mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) + _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) + if err != nil { + return err + } + + for _, value := range values { + ddlMeta := &datapb.DDLFlushMeta{} + err = proto.UnmarshalText(value, ddlMeta) + if err != nil { + return err + } + mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta + } + return nil +} + +// metaTable.lock.Lock() before call this function +func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { + value := proto.MarshalTextString(meta) + + mt.segID2FlushMeta[meta.SegmentID] = meta + prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) + + return mt.client.Save(prefix, value) +} + func (mt *metaTable) reloadSegMetaFromKV() error { mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) @@ -99,16 +155,6 @@ func (mt *metaTable) reloadSegMetaFromKV() error { return nil } -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.segID2FlushMeta[meta.SegmentID] = meta - prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) - - return mt.client.Save(prefix, value) -} - func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() @@ -151,61 +197,6 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, return ret, nil } -// --- DDL --- -func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - _, ok := mt.collID2DdlMeta[collID] - if !ok { - mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ - CollectionID: collID, - BinlogPaths: make([]string, 0), - } - } - - meta := mt.collID2DdlMeta[collID] - meta.BinlogPaths = append(meta.BinlogPaths, paths...) - - return mt.saveDDLFlushMeta(meta) -} - -func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool { - mt.lock.RLock() - defer mt.lock.RUnlock() - - _, ok := mt.collID2DdlMeta[collID] - return ok -} - -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.collID2DdlMeta[meta.CollectionID] = meta - prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) - - return mt.client.Save(prefix, value) -} - -func (mt *metaTable) reloadDdlMetaFromKV() error { - mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) - _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) - if err != nil { - return err - } - - for _, value := range values { - ddlMeta := &datapb.DDLFlushMeta{} - err = proto.UnmarshalText(value, ddlMeta) - if err != nil { - return err - } - mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta - } - return nil -} - func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) { mt.lock.RLock() defer mt.lock.RUnlock() diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go index 247cbff51f..dd5a4251dc 100644 --- a/internal/datanode/meta_table_test.go +++ b/internal/datanode/meta_table_test.go @@ -1,16 +1,26 @@ package datanode import ( + "context" "testing" "github.com/stretchr/testify/assert" - memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" + "github.com/stretchr/testify/require" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "go.etcd.io/etcd/clientv3" ) -func TestMetaTable_SegmentFlush(t *testing.T) { +func TestMetaTable_all(t *testing.T) { - kvMock := memkv.NewMemoryKV() - meta, err := NewMetaTable(kvMock) + etcdAddr := Params.EtcdAddress + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + require.NoError(t, err) + etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root") + + _, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix()) + require.NoError(t, err) + + meta, err := NewMetaTable(etcdKV) assert.NoError(t, err) defer meta.client.Close() @@ -55,6 +65,27 @@ func TestMetaTable_SegmentFlush(t *testing.T) { ret) }) + t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { + + collID2Paths := map[UniqueID][]string{ + 301: {"a", "b", "c"}, + 302: {"c", "b", "a"}, + } + + for collID, dataPaths := range collID2Paths { + for _, dp := range dataPaths { + err = meta.AppendDDLBinlogPaths(collID, []string{dp}) + assert.Nil(t, err) + } + } + + for k, v := range collID2Paths { + ret, err := meta.getDDLBinlogPaths(k) + assert.Nil(t, err) + assert.Equal(t, map[UniqueID][]string{k: v}, ret) + } + }) + t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { var segmentID UniqueID = 401 @@ -74,37 +105,3 @@ func TestMetaTable_SegmentFlush(t *testing.T) { }) } - -func TestMetaTable_DDLFlush(t *testing.T) { - kvMock := memkv.NewMemoryKV() - meta, err := NewMetaTable(kvMock) - assert.NoError(t, err) - defer meta.client.Close() - - t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { - - assert.False(t, meta.hasDDLFlushMeta(301)) - assert.False(t, meta.hasDDLFlushMeta(302)) - - collID2Paths := map[UniqueID][]string{ - 301: {"a", "b", "c"}, - 302: {"c", "b", "a"}, - } - - for collID, dataPaths := range collID2Paths { - for _, dp := range dataPaths { - err = meta.AppendDDLBinlogPaths(collID, []string{dp}) - assert.Nil(t, err) - } - } - - for k, v := range collID2Paths { - ret, err := meta.getDDLBinlogPaths(k) - assert.Nil(t, err) - assert.Equal(t, map[UniqueID][]string{k: v}, ret) - } - - assert.True(t, meta.hasDDLFlushMeta(301)) - assert.True(t, meta.hasDDLFlushMeta(302)) - }) -} diff --git a/internal/dataservice/channel.go b/internal/dataservice/channel.go index e88b9030b0..fb9a2c6a10 100644 --- a/internal/dataservice/channel.go +++ b/internal/dataservice/channel.go @@ -49,14 +49,13 @@ func (cm *insertChannelManager) AllocChannels(collectionID UniqueID, groupNum in group = make([]string, m) } for k := 0; k < len(group); k++ { - group[k] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count) + group = append(group, Params.InsertChannelPrefixName+strconv.Itoa(cm.count)) cm.count++ } i += int64(len(group)) j++ cg = append(cg, group) } - cm.channelGroups[collectionID] = cg return cg, nil } diff --git a/internal/dataservice/channel_test.go b/internal/dataservice/channel_test.go deleted file mode 100644 index e05884c368..0000000000 --- a/internal/dataservice/channel_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package dataservice - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestChannelAllocation(t *testing.T) { - Params.Init() - Params.InsertChannelNumPerCollection = 4 - manager := newInsertChannelManager() - cases := []struct { - collectionID UniqueID - groupNum int - expectGroupNum int - success bool - }{ - {1, 4, 4, true}, - {1, 4, 4, false}, - {2, 1, 1, true}, - {3, 5, 4, true}, - } - for _, c := range cases { - channels, err := manager.AllocChannels(c.collectionID, c.expectGroupNum) - if !c.success { - assert.NotNil(t, err) - continue - } - assert.Nil(t, err) - assert.EqualValues(t, c.expectGroupNum, len(channels)) - total := 0 - for _, channel := range channels { - total += len(channel) - } - assert.EqualValues(t, Params.InsertChannelNumPerCollection, total) - } -} diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 9964b34504..a7434d108a 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -64,8 +64,8 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 { c.mu.RLock() defer c.mu.RUnlock() ret := make([]int64, len(c.nodes)) - for i, node := range c.nodes { - ret[i] = node.id + for _, node := range c.nodes { + ret = append(ret, node.id) } return ret } diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index f9d13892a3..53e81b25d9 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -84,8 +84,6 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl } func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error { - allocator.mu.Lock() - defer allocator.mu.Unlock() if _, ok := allocator.segments[segmentInfo.SegmentID]; ok { return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID) } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 562653fb88..ec54db9403 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -652,9 +652,9 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat } fields := make([]UniqueID, len(flushMeta.Fields)) paths := make([]*internalpb2.StringList, len(flushMeta.Fields)) - for i, field := range flushMeta.Fields { - fields[i] = field.FieldID - paths[i] = &internalpb2.StringList{Values: field.BinlogPaths} + for _, field := range flushMeta.Fields { + fields = append(fields, field.FieldID) + paths = append(paths, &internalpb2.StringList{Values: field.BinlogPaths}) } resp.FieldIDs = fields resp.Paths = paths @@ -674,7 +674,7 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, return nil, err } - channels := make([]string, 0) + channels := make([]string, Params.InsertChannelNumPerCollection) for _, group := range channelGroups { channels = append(channels, group...) } diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 490ae036ca..9246ba702b 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -69,47 +69,40 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context) log.Println("data node time tick watcher closed") return case msg := <-watcher.msgQueue: - if err := watcher.handleTimeTickMsg(msg); err != nil { - log.Println(err.Error()) - continue - } - } - } -} - -func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTickMsg) error { - segments, err := watcher.allocator.GetSealedSegments() - if err != nil { - return err - } - for _, id := range segments { - expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp) - if err != nil { - log.Printf("check allocations expired error %s", err.Error()) - continue - } - if expired { - segmentInfo, err := watcher.meta.GetSegment(id) + segments, err := watcher.allocator.GetSealedSegments() if err != nil { - log.Println(err.Error()) + log.Printf("get sealed segments error %s", err.Error()) continue } - if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil { - log.Println(err.Error()) - continue + for _, id := range segments { + expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp) + if err != nil { + log.Printf("check allocations expired error %s", err.Error()) + continue + } + if expired { + segmentInfo, err := watcher.meta.GetSegment(id) + if err != nil { + log.Println(err.Error()) + continue + } + if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil { + log.Println(err.Error()) + continue + } + watcher.cluster.FlushSegment(&datapb.FlushSegRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowCollections, + MsgID: -1, // todo add msg id + Timestamp: 0, // todo + SourceID: -1, // todo + }, + CollectionID: segmentInfo.CollectionID, + SegmentIDs: []int64{segmentInfo.SegmentID}, + }) + watcher.allocator.DropSegment(id) + } } - watcher.cluster.FlushSegment(&datapb.FlushSegRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowCollections, - MsgID: -1, // todo add msg id - Timestamp: 0, // todo - SourceID: Params.NodeID, - }, - CollectionID: segmentInfo.CollectionID, - SegmentIDs: []int64{segmentInfo.SegmentID}, - }) - watcher.allocator.DropSegment(id) } } - return nil } diff --git a/internal/dataservice/watcher_test.go b/internal/dataservice/watcher_test.go deleted file mode 100644 index 85af18e530..0000000000 --- a/internal/dataservice/watcher_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package dataservice - -import ( - "strconv" - "testing" - "time" - - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/stretchr/testify/assert" -) - -func TestDataNodeTTWatcher(t *testing.T) { - Params.Init() - c := make(chan struct{}) - cluster := newDataNodeCluster(c) - defer cluster.ShutDownClients() - schema := newTestSchema() - allocator := newMockAllocator() - meta, err := newMemoryMeta(allocator) - assert.Nil(t, err) - segAllocator, err := newSegmentAllocator(meta, allocator) - assert.Nil(t, err) - watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster) - - id, err := allocator.allocID() - assert.Nil(t, err) - err = meta.AddCollection(&collectionInfo{ - Schema: schema, - ID: id, - }) - assert.Nil(t, err) - - cases := []struct { - sealed bool - allocation bool - expired bool - expected bool - }{ - {false, false, true, false}, - {false, true, true, false}, - {false, true, false, false}, - {true, false, true, true}, - {true, true, false, false}, - {true, true, true, true}, - } - - segmentIDs := make([]UniqueID, len(cases)) - for i, c := range cases { - segID, err := allocator.allocID() - segmentIDs[i] = segID - assert.Nil(t, err) - segmentInfo, err := BuildSegment(id, 100, segID, []string{"channel" + strconv.Itoa(i)}) - assert.Nil(t, err) - err = meta.AddSegment(segmentInfo) - assert.Nil(t, err) - err = segAllocator.OpenSegment(segmentInfo) - assert.Nil(t, err) - if c.allocation && c.expired { - _, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100) - assert.Nil(t, err) - } - } - - time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) - for i, c := range cases { - if c.allocation && !c.expired { - _, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100) - assert.Nil(t, err) - } - if c.sealed { - err := segAllocator.SealSegment(segmentIDs[i]) - assert.Nil(t, err) - } - } - ts, err := allocator.allocTimestamp() - assert.Nil(t, err) - - err = watcher.handleTimeTickMsg(&msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb2.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kTimeTick, - Timestamp: ts, - }, - }, - }) - assert.Nil(t, err) - for i, c := range cases { - _, ok := segAllocator.segments[segmentIDs[i]] - assert.EqualValues(t, !c.expected, ok) - } -} diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index b295e685fe..7617b07813 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -123,7 +123,7 @@ func (s *Server) init() error { }() s.wg.Add(1) - go s.startGrpcLoop(Params.Port) + s.startGrpcLoop(Params.Port) // wait for grpc server loop start err = <-s.grpcErrChan if err != nil { diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 035321e55f..ece32912be 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -59,7 +59,7 @@ func (s *Server) init() error { proxyservice.Params.Init() s.wg.Add(1) - go s.startGrpcLoop(Params.ServicePort) + s.startGrpcLoop(Params.ServicePort) // wait for grpc server loop start if err := <-s.grpcErrChan; err != nil { return err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index d21caf64c1..9c16007fbf 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -89,7 +89,8 @@ func (s *Server) Start() error { } func (s *Server) Stop() error { - return s.Stop() + s.grpcServer.Stop() + return s.node.Stop() } func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index d2d8ab7613..89d7b10749 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -247,9 +247,7 @@ func (c *Core) checkInit() error { if c.DataNodeSegmentFlushCompletedChan == nil { return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil") } - log.Printf("master node id = %d", Params.NodeID) - log.Printf("master dd channel name = %s", Params.DdChannel) - log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) + log.Printf("master node id = %d\n", Params.NodeID) return nil } @@ -609,7 +607,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { return err } Params.ProxyTimeTickChannel = rsp - log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ @@ -636,8 +633,6 @@ func (c *Core) SetDataService(s DataServiceInterface) error { return err } Params.DataServiceSegmentChannel = rsp - log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel) - c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.tsoAllocator.Alloc(1) if err != nil { diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 6f6287931d..fe975d9fc4 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -39,9 +39,8 @@ func (s *ServiceImpl) fillNodeInitParams() error { getConfigContentByName := func(fileName string) []byte { _, fpath, _, _ := runtime.Caller(0) - configFile := path.Dir(fpath) + "/../../configs/" + fileName + configFile := path.Dir(fpath) + "/../../../configs/" + fileName _, err := os.Stat(configFile) - log.Printf("configFile = %s", configFile) if os.IsNotExist(err) { runPath, err := os.Getwd() if err != nil { diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go index caf044ea02..8c95915bba 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_index_service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "path" "sort" "strconv" "strings" @@ -16,6 +17,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/storage" ) type loadIndexService struct { @@ -137,18 +139,10 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { } // 1. use msg's index paths to get index bytes var err error - ok, err = lis.checkIndexReady(indexMsg) - if err != nil { - return err - } - if ok { - // no error - return errors.New("") - } - var indexBuffer [][]byte + var indexParams indexParam fn := func() error { - indexBuffer, err = lis.loadIndex(indexMsg.IndexPaths) + indexBuffer, indexParams, err = lis.loadIndex(indexMsg.IndexPaths) if err != nil { return err } @@ -158,13 +152,21 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error { if err != nil { return err } + ok, err = lis.checkIndexReady(indexParams, indexMsg) + if err != nil { + return err + } + if ok { + // no error + return errors.New("") + } // 2. use index bytes and index path to update segment - err = lis.updateSegmentIndex(indexBuffer, indexMsg) + err = lis.updateSegmentIndex(indexParams, indexBuffer, indexMsg) if err != nil { return err } //3. update segment index stats - err = lis.updateSegmentIndexStats(indexMsg) + err = lis.updateSegmentIndexStats(indexParams, indexMsg) if err != nil { return err } @@ -222,7 +224,7 @@ func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, return collectionID, fieldID, nil } -func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadIndexMsg) error { +func (lis *loadIndexService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error { targetSegment, err := lis.replica.getSegmentByID(indexMsg.SegmentID) if err != nil { return err @@ -230,7 +232,13 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadInd fieldStatsKey := lis.fieldsStatsIDs2Key(targetSegment.collectionID, indexMsg.FieldID) _, ok := lis.fieldIndexes[fieldStatsKey] - newIndexParams := indexMsg.IndexParams + newIndexParams := make([]*commonpb.KeyValuePair, 0) + for k, v := range indexParams { + newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{ + Key: k, + Value: v, + }) + } // sort index params by key sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) @@ -262,22 +270,40 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadInd return nil } -func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, error) { +func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, indexParam, error) { index := make([][]byte, 0) - for _, path := range indexPath { + var indexParams indexParam + for _, p := range indexPath { fmt.Println("load path = ", indexPath) - indexPiece, err := (*lis.client).Load(path) + indexPiece, err := (*lis.client).Load(p) if err != nil { - return nil, err + return nil, nil, err + } + // get index params when detecting indexParamPrefix + if path.Base(p) == storage.IndexParamsFile { + indexCodec := storage.NewIndexCodec() + _, indexParams, err = indexCodec.Deserialize([]*storage.Blob{ + { + Key: storage.IndexParamsFile, + Value: []byte(indexPiece), + }, + }) + if err != nil { + return nil, nil, err + } + } else { + index = append(index, []byte(indexPiece)) } - index = append(index, []byte(indexPiece)) } - return index, nil + if len(indexParams) <= 0 { + return nil, nil, errors.New("cannot find index param") + } + return index, indexParams, nil } -func (lis *loadIndexService) updateSegmentIndex(bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error { +func (lis *loadIndexService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error { segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID) if err != nil { return err @@ -292,8 +318,8 @@ func (lis *loadIndexService) updateSegmentIndex(bytesIndex [][]byte, loadIndexMs if err != nil { return err } - for _, indexParam := range loadIndexMsg.IndexParams { - err = loadIndexInfo.appendIndexParam(indexParam.Key, indexParam.Value) + for k, v := range indexParams { + err = loadIndexInfo.appendIndexParam(k, v) if err != nil { return err } @@ -330,12 +356,12 @@ func (lis *loadIndexService) sendQueryNodeStats() error { return nil } -func (lis *loadIndexService) checkIndexReady(loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) { +func (lis *loadIndexService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) { segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID) if err != nil { return false, err } - if !segment.matchIndexParam(loadIndexMsg.FieldID, loadIndexMsg.IndexParams) { + if !segment.matchIndexParam(loadIndexMsg.FieldID, indexParams) { return false, nil } return true, nil diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index 54f545b528..c2377f416b 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -22,6 +22,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/querynode/client" + "github.com/zilliztech/milvus-distributed/internal/storage" ) func TestLoadIndexService_FloatVector(t *testing.T) { @@ -273,6 +274,9 @@ func TestLoadIndexService_FloatVector(t *testing.T) { binarySet, err := index.Serialize() assert.Equal(t, err, nil) indexPaths := make([]string, 0) + var indexCodec storage.IndexCodec + binarySet, err = indexCodec.Serialize(binarySet, indexParams) + assert.NoError(t, err) for _, index := range binarySet { path := strconv.Itoa(int(segmentID)) + "/" + index.Key indexPaths = append(indexPaths, path) @@ -588,6 +592,9 @@ func TestLoadIndexService_BinaryVector(t *testing.T) { //save index to minio binarySet, err := index.Serialize() assert.Equal(t, err, nil) + var indexCodec storage.IndexCodec + binarySet, err = indexCodec.Serialize(binarySet, indexParams) + assert.NoError(t, err) indexPaths := make([]string, 0) for _, index := range binarySet { path := strconv.Itoa(int(segmentID)) + "/" + index.Key diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 16bea19bb9..287adc23c7 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -12,12 +12,16 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) const ctxTimeInMillisecond = 5000 const closeWithDeadline = true +type queryServiceMock struct{} + func setup() { Params.Init() Params.MetaRootPath = "/etcd/test/root/querynode" @@ -131,6 +135,11 @@ func newQueryNodeMock() *QueryNode { } svr := NewQueryNode(ctx, 0) + err := svr.SetQueryService(&queryServiceMock{}) + if err != nil { + panic(err) + } + return svr } @@ -153,6 +162,17 @@ func refreshChannelNames() { Params.LoadIndexChannelNames = makeNewChannelNames(Params.LoadIndexChannelNames, suffix) } +func (q *queryServiceMock) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { + return &querypb.RegisterNodeResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + InitParams: &internalpb2.InitParams{ + NodeID: int64(1), + }, + }, nil +} + func TestMain(m *testing.M) { setup() refreshChannelNames() diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 46aff8bb7a..c617f8f277 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -232,7 +232,7 @@ func (s *Segment) setIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValue return nil } -func (s *Segment) matchIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) bool { +func (s *Segment) matchIndexParam(fieldID int64, indexParams indexParam) bool { s.paramMutex.RLock() defer s.paramMutex.RUnlock() fieldIndexParam := s.indexParam[fieldID] @@ -241,12 +241,12 @@ func (s *Segment) matchIndexParam(fieldID int64, indexParamKv []*commonpb.KeyVal } paramSize := len(s.indexParam) matchCount := 0 - for _, param := range indexParamKv { - value, ok := fieldIndexParam[param.Key] + for k, v := range indexParams { + value, ok := fieldIndexParam[k] if !ok { return false } - if param.Value != value { + if v != value { return false } matchCount++ diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go index 94e9633ac2..7a7f29b574 100644 --- a/internal/querynode/segment_manager.go +++ b/internal/querynode/segment_manager.go @@ -42,7 +42,7 @@ func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error } //TODO, index params -func (s *segmentManager) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, indexParam, error) { +func (s *segmentManager) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) { req := &milvuspb.DescribeSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeSegment, @@ -52,9 +52,9 @@ func (s *segmentManager) getIndexInfo(collectionID UniqueID, segmentID UniqueID) } response, err := s.masterClient.DescribeSegment(req) if err != nil { - return 0, nil, err + return 0, 0, err } - return response.IndexID, nil, nil + return response.IndexID, response.BuildID, nil } func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error { @@ -70,16 +70,22 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID } } for _, segmentID := range segmentIDs { - indexID, indexParams, err := s.getIndexInfo(collectionID, segmentID) - if err != nil { - return err + // we don't need index id yet + _, buildID, err := s.getIndexInfo(collectionID, segmentID) + if err == nil { + // we don't need load vector fields + vectorFields, err := s.replica.getVecFieldsBySegmentID(segmentID) + if err != nil { + return err + } + fieldIDs = s.filterOutVectorFields(fieldIDs, vectorFields) } paths, srcFieldIDs, err := s.getInsertBinlogPaths(segmentID) if err != nil { return err } - targetFields := s.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) + targetFields := s.getTargetFields(paths, srcFieldIDs, fieldIDs) // replace segment err = s.replica.removeSegment(segmentID) if err != nil { @@ -93,11 +99,11 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID if err != nil { return err } - indexPaths, err := s.getIndexPaths(indexID) + indexPaths, err := s.getIndexPaths(buildID) if err != nil { return err } - err = s.loadIndex(segmentID, indexPaths, indexParams) + err = s.loadIndex(segmentID, indexPaths) if err != nil { // TODO: return or continue? return err @@ -133,7 +139,17 @@ func (s *segmentManager) getInsertBinlogPaths(segmentID UniqueID) ([]*internalPb return pathResponse.Paths, pathResponse.FieldIDs, nil } -func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList { +func (s *segmentManager) filterOutVectorFields(fieldIDs []int64, vectorFields map[int64]string) []int64 { + targetFields := make([]int64, 0) + for _, id := range fieldIDs { + if _, ok := vectorFields[id]; !ok { + targetFields = append(targetFields, id) + } + } + return targetFields +} + +func (s *segmentManager) getTargetFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList { targetFields := make(map[int64]*internalPb.StringList) containsFunc := func(s []int64, e int64) bool { @@ -156,6 +172,11 @@ func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields map[int64]*internalPb.StringList) error { for id, p := range targetFields { + if id == timestampFieldID { + // seg core doesn't need timestamp field + continue + } + paths := p.Values blobs := make([]*storage.Blob, 0) for _, path := range paths { @@ -233,13 +254,14 @@ func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields return nil } -func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) { +func (s *segmentManager) getIndexPaths(buildID UniqueID) ([]string, error) { if s.indexClient == nil { return nil, errors.New("null index service client") } indexFilePathRequest := &indexpb.IndexFilePathsRequest{ - IndexIDs: []UniqueID{indexID}, + // TODO: rename indexIDs to buildIDs + IndexIDs: []UniqueID{buildID}, } pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest) if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { @@ -253,7 +275,7 @@ func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) { return pathResponse.FilePaths[0].IndexFilePaths, nil } -func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string, indexParam indexParam) error { +func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string) error { // get vector field ids from schema to load index vecFieldIDs, err := s.replica.getVecFieldsBySegmentID(segmentID) if err != nil { @@ -261,7 +283,7 @@ func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string, inde } for id, name := range vecFieldIDs { // non-blocking send - go s.sendLoadIndex(indexPaths, segmentID, id, name, indexParam) + go s.sendLoadIndex(indexPaths, segmentID, id, name) } return nil @@ -270,25 +292,15 @@ func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string, inde func (s *segmentManager) sendLoadIndex(indexPaths []string, segmentID int64, fieldID int64, - fieldName string, - indexParams map[string]string) { - var indexParamsKV []*commonpb.KeyValuePair - for key, value := range indexParams { - indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - + fieldName string) { loadIndexRequest := internalPb.LoadIndex{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kSearchResult, }, - SegmentID: segmentID, - FieldName: fieldName, - FieldID: fieldID, - IndexPaths: indexPaths, - IndexParams: indexParamsKV, + SegmentID: segmentID, + FieldName: fieldName, + FieldID: fieldID, + IndexPaths: indexPaths, } loadIndexMsg := &msgstream.LoadIndexMsg{ diff --git a/internal/querynode/segment_manager_test.go b/internal/querynode/segment_manager_test.go index b3c731673c..56ec50bb10 100644 --- a/internal/querynode/segment_manager_test.go +++ b/internal/querynode/segment_manager_test.go @@ -137,7 +137,7 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID return paths, fieldIDs, nil } -func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { +func generateIndex(segmentID UniqueID) ([]string, error) { const ( msgLength = 1000 DIM = 16 @@ -174,12 +174,12 @@ func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { index, err := indexnode.NewCIndex(typeParams, indexParams) if err != nil { - return nil, nil, err + return nil, err } err = index.BuildFloatVecIndexWithoutIds(indexRowData) if err != nil { - return nil, nil, err + return nil, err } option := &minioKV.Option{ @@ -193,26 +193,33 @@ func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { kv, err := minioKV.NewMinIOKV(context.Background(), option) if err != nil { - return nil, nil, err + return nil, err } - //save index to minio + // save index to minio binarySet, err := index.Serialize() if err != nil { - return nil, nil, err + return nil, err + } + + // serialize index params + var indexCodec storage.IndexCodec + serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams) + if err != nil { + return nil, err } indexPaths := make([]string, 0) - for _, index := range binarySet { - path := strconv.Itoa(int(segmentID)) + "/" + index.Key - indexPaths = append(indexPaths, path) - err := kv.Save(path, string(index.Value)) + for _, index := range serializedIndexBlobs { + p := strconv.Itoa(int(segmentID)) + "/" + index.Key + indexPaths = append(indexPaths, p) + err := kv.Save(p, string(index.Value)) if err != nil { - return nil, nil, err + return nil, err } } - return indexPaths, indexParams, nil + return indexPaths, nil } func doInsert(ctx context.Context, collectionName string, partitionTag string, segmentID UniqueID) error { @@ -328,11 +335,6 @@ func doInsert(ctx context.Context, collectionName string, partitionTag string, s return err } - //messages := insertStream.Consume() - //for _, msg := range messages.Msgs { - // - //} - return nil } @@ -420,16 +422,16 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) - fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) + fieldsMap := node.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs) assert.Equal(t, len(fieldsMap), 2) err = node.segManager.loadSegmentFieldsData(segmentID, fieldsMap) assert.NoError(t, err) - indexPaths, indexParams, err := generateIndex(segmentID) + indexPaths, err := generateIndex(segmentID) assert.NoError(t, err) - err = node.segManager.loadIndex(segmentID, indexPaths, indexParams) + err = node.segManager.loadIndex(segmentID, indexPaths) assert.NoError(t, err) // do search @@ -507,7 +509,7 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { // //paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) // //assert.NoError(t, err) // -// //fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) +// //fieldsMap := node.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs) // //assert.Equal(t, len(fieldsMap), 2) // // segmentIDToInsert := UniqueID(3) diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index a7d724e338..957ee0ca85 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -8,6 +8,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +const rowIDFieldID = 0 +const timestampFieldID = 1 + type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp type IntPrimaryKey = typeutil.IntPrimaryKey @@ -20,7 +23,6 @@ type TimeRange struct { type MasterServiceInterface interface { DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) - DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) } type QueryServiceInterface interface { diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c9306b148e..4736201027 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -17,7 +17,7 @@ import ( const ( Ts = "ts" DDL = "ddl" - indexParamsFile = "indexParams" + IndexParamsFile = "indexParams" ) type ( @@ -640,14 +640,14 @@ func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string) if err != nil { return nil, err } - blobs = append(blobs, &Blob{Key: indexParamsFile, Value: paramsBytes}) + blobs = append(blobs, &Blob{Key: IndexParamsFile, Value: paramsBytes}) return blobs, nil } func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, error) { var params map[string]string for i := 0; i < len(blobs); i++ { - if blobs[i].Key != indexParamsFile { + if blobs[i].Key != IndexParamsFile { continue } if err := json.Unmarshal(blobs[i].Value, ¶ms); err != nil { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index a2fff3262c..64fc918744 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -313,7 +313,7 @@ func TestIndexCodec(t *testing.T) { blobsInput, err := indexCodec.Serialize(blobs, indexParams) assert.Nil(t, err) assert.EqualValues(t, 4, len(blobsInput)) - assert.EqualValues(t, indexParamsFile, blobsInput[3]) + assert.EqualValues(t, IndexParamsFile, blobsInput[3]) blobsOutput, indexParamsOutput, err := indexCodec.Deserialize(blobsInput) assert.Nil(t, err) assert.EqualValues(t, 3, len(blobsOutput)) diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 0a105d0d97..b8461b5598 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -18,14 +18,14 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast #go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast -go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast +#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast #go test -race -cover "${MILVUS_DIR}/master/..." -failfast #go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast +go test -race -cover "${MILVUS_DIR}/querynode/..." -failfast #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/util/..." -failfast go test -race -cover "${MILVUS_DIR}/msgstream/..." -failfast go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast #go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast -go test -race -cover "${MILVUS_DIR}/dataservice/..." -failfast