Rename IndexService to IndexCoord (#5932)

* rename package indexservice to indexcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename indexservice to indexcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix queryservice static-check

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* move distributed/indexservice to distributed/indexcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* move internal/indexservice to internal/indexcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename indexservice to indexcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename MasterComponent to RootCoordComponent

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord for queryservice

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord for dataservice

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord for datanode

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord for proxynode

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord for querynode

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename master to rootcoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename IndexService to IndexCoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename IndexService to IndexCoord

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix rebase issue

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-06-21 17:28:03 +08:00 committed by GitHub
parent f8a391aa4f
commit 92e429d812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 611 additions and 616 deletions

View File

@ -14,18 +14,18 @@ package components
import (
"context"
grpcindexserver "github.com/milvus-io/milvus/internal/distributed/indexservice"
grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord"
)
type IndexCoord struct {
svr *grpcindexserver.Server
svr *grpcindexcoord.Server
}
// NewIndexService creates a new IndexCoord
// NewIndexCoord creates a new IndexCoord
func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
var err error
s := &IndexCoord{}
svr, err := grpcindexserver.NewServer(ctx)
svr, err := grpcindexcoord.NewServer(ctx)
if err != nil {
return nil, err

View File

@ -24,8 +24,8 @@ import (
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/datanode"
"github.com/milvus-io/milvus/internal/dataservice"
"github.com/milvus-io/milvus/internal/indexcoord"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/indexservice"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/metrics"
@ -251,10 +251,10 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
wg.Add(1)
go func() {
indexservice.Params.Init()
indexcoord.Params.Init()
if !localMsg {
logutil.SetupLogger(&indexservice.Params.Log)
logutil.SetupLogger(&indexcoord.Params.Log)
defer log.Sync()
}

View File

@ -35,9 +35,9 @@ type UniqueID = typeutil.UniqueID
type IDAllocator struct {
Allocator
etcdEndpoints []string
metaRoot string
masterClient types.MasterService
etcdEndpoints []string
metaRoot string
rootCoordClient types.RootCoord
countPerRPC uint32
@ -71,16 +71,16 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string
func (ia *IDAllocator) Start() error {
var err error
ia.masterClient, err = rcc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, 3*time.Second)
ia.rootCoordClient, err = rcc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, 3*time.Second)
if err != nil {
panic(err)
}
if err = ia.masterClient.Init(); err != nil {
if err = ia.rootCoordClient.Init(); err != nil {
panic(err)
}
if err = ia.masterClient.Start(); err != nil {
if err = ia.rootCoordClient.Start(); err != nil {
panic(err)
}
return ia.Allocator.Start()
@ -112,7 +112,7 @@ func (ia *IDAllocator) syncID() (bool, error) {
},
Count: need,
}
resp, err := ia.masterClient.AllocID(ctx, req)
resp, err := ia.rootCoordClient.AllocID(ctx, req)
cancel()
if err != nil {

View File

@ -29,20 +29,20 @@ type allocatorInterface interface {
}
type allocator struct {
masterService types.MasterService
rootCoord types.RootCoord
}
var _ allocatorInterface = &allocator{}
func newAllocator(s types.MasterService) *allocator {
func newAllocator(s types.RootCoord) *allocator {
return &allocator{
masterService: s,
rootCoord: s,
}
}
func (alloc *allocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := alloc.masterService.AllocID(ctx, &masterpb.AllocIDRequest{
resp, err := alloc.rootCoord.AllocID(ctx, &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestID,
MsgID: 1, // GOOSE TODO

View File

@ -18,7 +18,7 @@ import (
)
func TestAllocator_Basic(t *testing.T) {
ms := &MasterServiceFactory{}
ms := &RootCoordFactory{}
allocator := newAllocator(ms)
t.Run("Test allocID", func(t *testing.T) {
@ -88,7 +88,7 @@ func TestAllocator_Basic(t *testing.T) {
assert.Errorf(t, err, "number: %d", i)
}
// MasterService's unavailability doesn't affects genKey when alloc == false
// RootCoord's unavailability doesn't affects genKey when alloc == false
tests = []Test{
{in{false, []UniqueID{1, 2, 3}}, out{"1/2/3", nil}},
{in{false, []UniqueID{1}}, out{"1", nil}},

View File

@ -48,7 +48,7 @@ const (
// services of data node.
//
// DataNode struct implements `types.Component`, `types.DataNode` interfaces.
// `masterService` holds a grpc client of master service.
// `rootCoord` holds a grpc client of root coordinator.
// `dataService` holds a grpc client of data service.
// `NodeID` is unique to each data node.
// `State` is current statement of this data node, indicating whether it's healthy.
@ -70,8 +70,8 @@ type DataNode struct {
clearSignal chan UniqueID // collection ID
segmentCache *Cache
masterService types.MasterService
dataService types.DataService
rootCoord types.RootCoord
dataService types.DataService
session *sessionutil.Session
@ -90,10 +90,10 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
Role: typeutil.DataNodeRole,
watchDm: make(chan struct{}, 1),
masterService: nil,
dataService: nil,
msFactory: factory,
segmentCache: newCache(),
rootCoord: nil,
dataService: nil,
msFactory: factory,
segmentCache: newCache(),
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushCh: make(map[string]chan<- *flushMsg),
@ -103,13 +103,13 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
return node
}
// SetMasterServiceInterface sets master service's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetMasterServiceInterface(ms types.MasterService) error {
// SetRootCoordInterface sets master service's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetRootCoordInterface(rc types.RootCoord) error {
switch {
case ms == nil, node.masterService != nil:
case rc == nil, node.rootCoord != nil:
return errors.New("Nil parameter or repeatly set")
default:
node.masterService = ms
node.rootCoord = rc
return nil
}
}
@ -156,9 +156,9 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
return nil
}
replica := newReplica(node.masterService, vchan.CollectionID)
replica := newReplica(node.rootCoord, vchan.CollectionID)
var alloc allocatorInterface = newAllocator(node.masterService)
var alloc allocatorInterface = newAllocator(node.rootCoord)
log.Debug("Received Vchannel Info",
zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())),
@ -218,7 +218,7 @@ var FilterThreshold Timestamp
// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
rep, err := node.masterService.AllocTimestamp(node.ctx, &masterpb.AllocTimestampRequest{
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: 0,

View File

@ -39,11 +39,11 @@ func TestDataSyncService_Start(t *testing.T) {
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
mockMaster := &MasterServiceFactory{}
mockRootCoord := &RootCoordFactory{}
collectionID := UniqueID(1)
flushChan := make(chan *flushMsg, 100)
replica := newReplica(mockMaster, collectionID)
replica := newReplica(mockRootCoord, collectionID)
allocFactory := NewAllocatorFactory(1)
msFactory := msgstream.NewPmsFactory()

View File

@ -49,9 +49,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
mockMaster := &MasterServiceFactory{}
mockRootCoord := &RootCoordFactory{}
replica := newReplica(mockMaster, collMeta.ID)
replica := newReplica(mockRootCoord, collMeta.ID)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
@ -137,9 +137,9 @@ func TestFlushSegment(t *testing.T) {
collMeta := genCollectionMeta(collectionID, "test_flush_segment_txn")
flushMap := sync.Map{}
mockMaster := &MasterServiceFactory{}
mockRootCoord := &RootCoordFactory{}
replica := newReplica(mockMaster, collMeta.ID)
replica := newReplica(mockRootCoord, collMeta.ID)
err := replica.addNewSegment(segmentID, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
@ -263,7 +263,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
dataFactory := NewDataFactory()
mockMaster := &MasterServiceFactory{}
mockRootCoord := &RootCoordFactory{}
colRep := &SegmentReplica{
collectionID: collMeta.ID,
@ -272,7 +272,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
flushedSegments: make(map[UniqueID]*Segment),
}
colRep.metaService = newMetaService(mockMaster, collMeta.ID)
colRep.metaService = newMetaService(mockRootCoord, collMeta.ID)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{

View File

@ -24,18 +24,18 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
// metaService initialize replica collections in data node from master service.
// metaService initialize replica collections in data node from root coord.
// Initializing replica collections happens on data node starting. It depends on
// a healthy master service and a valid master service grpc client.
// a healthy root coord and a valid root coord grpc client.
type metaService struct {
replica Replica
collectionID UniqueID
masterClient types.MasterService
rootCoord types.RootCoord
}
func newMetaService(m types.MasterService, collectionID UniqueID) *metaService {
func newMetaService(rc types.RootCoord, collectionID UniqueID) *metaService {
return &metaService{
masterClient: m,
rootCoord: rc,
collectionID: collectionID,
}
}
@ -54,13 +54,13 @@ func (mService *metaService) getCollectionSchema(ctx context.Context, collID Uni
TimeStamp: timestamp,
}
response, err := mService.masterClient.DescribeCollection(ctx, req)
response, err := mService.rootCoord.DescribeCollection(ctx, req)
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("Describe collection %v from master service wrong: %s", collID, err.Error())
return nil, fmt.Errorf("Describe collection %v from rootcoord wrong: %s", collID, err.Error())
}
if err != nil {
return nil, fmt.Errorf("Grpc error when describe collection %v from master service: %s", collID, err.Error())
return nil, fmt.Errorf("Grpc error when describe collection %v from rootcoord: %s", collID, err.Error())
}
return response.GetSchema(), nil

View File

@ -29,7 +29,7 @@ func TestMetaService_All(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mFactory := &MasterServiceFactory{}
mFactory := &RootCoordFactory{}
mFactory.setCollectionID(collectionID0)
mFactory.setCollectionName(collectionName0)
ms := newMetaService(mFactory, collectionID0)

View File

@ -48,13 +48,13 @@ func newIDLEDataNodeMock(ctx context.Context) *DataNode {
msFactory := msgstream.NewPmsFactory()
node := NewDataNode(ctx, msFactory)
ms := &MasterServiceFactory{
rc := &RootCoordFactory{
ID: 0,
collectionID: 1,
collectionName: "collection-1",
}
node.SetMasterServiceInterface(ms)
node.SetRootCoordInterface(rc)
ds := &DataServiceFactory{}
node.SetDataServiceInterface(ds)
@ -80,13 +80,13 @@ func newHEALTHDataNodeMock(dmChannelName string) *DataNode {
msFactory := msgstream.NewPmsFactory()
node := NewDataNode(ctx, msFactory)
ms := &MasterServiceFactory{
ms := &RootCoordFactory{
ID: 0,
collectionID: 1,
collectionName: "collection-1",
}
node.SetMasterServiceInterface(ms)
node.SetRootCoordInterface(ms)
ds := &DataServiceFactory{}
node.SetDataServiceInterface(ds)
@ -156,8 +156,8 @@ type DataFactory struct {
rawData []byte
}
type MasterServiceFactory struct {
types.MasterService
type RootCoordFactory struct {
types.RootCoord
ID UniqueID
collectionName string
collectionID UniqueID
@ -487,19 +487,19 @@ func (alloc *AllocatorFactory) genKey(isalloc bool, ids ...UniqueID) (key string
// If id == 0, AllocID will return not successful status
// If id == -1, AllocID will return err
func (m *MasterServiceFactory) setID(id UniqueID) {
func (m *RootCoordFactory) setID(id UniqueID) {
m.ID = id // GOOSE TODO: random ID generator
}
func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
func (m *RootCoordFactory) setCollectionID(id UniqueID) {
m.collectionID = id
}
func (m *MasterServiceFactory) setCollectionName(name string) {
func (m *RootCoordFactory) setCollectionName(name string) {
m.collectionName = name
}
func (m *MasterServiceFactory) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
func (m *RootCoordFactory) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
resp := &masterpb.AllocIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -520,7 +520,7 @@ func (m *MasterServiceFactory) AllocID(ctx context.Context, in *masterpb.AllocID
return resp, nil
}
func (m *MasterServiceFactory) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
func (m *RootCoordFactory) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
resp := &masterpb.AllocTimestampResponse{
Status: &commonpb.Status{},
Timestamp: 1000,
@ -528,7 +528,7 @@ func (m *MasterServiceFactory) AllocTimestamp(ctx context.Context, in *masterpb.
return resp, nil
}
func (m *MasterServiceFactory) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
func (m *RootCoordFactory) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
resp := &milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{},
CollectionNames: []string{m.collectionName},
@ -537,7 +537,7 @@ func (m *MasterServiceFactory) ShowCollections(ctx context.Context, in *milvuspb
}
func (m *MasterServiceFactory) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
func (m *RootCoordFactory) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
f := MetaFactory{}
meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
resp := &milvuspb.DescribeCollectionResponse{
@ -548,7 +548,7 @@ func (m *MasterServiceFactory) DescribeCollection(ctx context.Context, in *milvu
return resp, nil
}
func (m *MasterServiceFactory) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
func (m *RootCoordFactory) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{},
SubcomponentStates: make([]*internalpb.ComponentInfo, 0),

View File

@ -76,8 +76,8 @@ type SegmentReplica struct {
var _ Replica = &SegmentReplica{}
func newReplica(ms types.MasterService, collID UniqueID) Replica {
metaService := newMetaService(ms, collID)
func newReplica(rc types.RootCoord, collID UniqueID) Replica {
metaService := newMetaService(rc, collID)
var replica Replica = &SegmentReplica{
collectionID: collID,

View File

@ -21,8 +21,8 @@ import (
"github.com/milvus-io/milvus/internal/types"
)
func newSegmentReplica(ms types.MasterService, collID UniqueID) *SegmentReplica {
metaService := newMetaService(ms, collID)
func newSegmentReplica(rc types.RootCoord, collID UniqueID) *SegmentReplica {
metaService := newMetaService(rc, collID)
var replica = &SegmentReplica{
collectionID: collID,
@ -37,11 +37,11 @@ func newSegmentReplica(ms types.MasterService, collID UniqueID) *SegmentReplica
}
func TestSegmentReplica(t *testing.T) {
mockMaster := &MasterServiceFactory{}
rc := &RootCoordFactory{}
collID := UniqueID(1)
t.Run("Test inner function segment", func(t *testing.T) {
replica := newSegmentReplica(mockMaster, collID)
replica := newSegmentReplica(rc, collID)
assert.False(t, replica.hasSegment(0))
startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)}

View File

@ -24,19 +24,19 @@ type allocator interface {
allocID() (UniqueID, error)
}
type masterAllocator struct {
masterClient types.MasterService
type rootCoordAllocator struct {
rootCoordClient types.RootCoord
}
func newAllocator(masterClient types.MasterService) *masterAllocator {
return &masterAllocator{
masterClient: masterClient,
func newAllocator(rootCoordClient types.RootCoord) *rootCoordAllocator {
return &rootCoordAllocator{
rootCoordClient: rootCoordClient,
}
}
func (allocator *masterAllocator) allocTimestamp() (Timestamp, error) {
func (allocator *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocTimestamp(ctx, &masterpb.AllocTimestampRequest{
resp, err := allocator.rootCoordClient.AllocTimestamp(ctx, &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: -1, // todo add msg id
@ -51,9 +51,9 @@ func (allocator *masterAllocator) allocTimestamp() (Timestamp, error) {
return resp.Timestamp, nil
}
func (allocator *masterAllocator) allocID() (UniqueID, error) {
func (allocator *rootCoordAllocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocID(ctx, &masterpb.AllocIDRequest{
resp, err := allocator.rootCoordClient.AllocID(ctx, &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestID,
MsgID: -1, // todo add msg id

View File

@ -18,7 +18,7 @@ import (
)
func TestAllocator_Basic(t *testing.T) {
ms := newMockMasterService()
ms := newMockRootCoordService()
allocator := newAllocator(ms)
t.Run("Test allocTimestamp", func(t *testing.T) {

View File

@ -84,10 +84,10 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Uint32("count", r.GetCount()))
if !s.meta.HasCollection(r.CollectionID) {
if err := s.loadCollectionFromMaster(ctx, r.CollectionID); err != nil {
if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil {
errMsg := fmt.Sprintf("can not load collection %d", r.CollectionID)
appendFailedAssignment(errMsg)
log.Error("load collection from master error",
log.Error("load collection from rootcoord error",
zap.Int64("collectionID", r.CollectionID),
zap.Error(err))
continue
@ -435,7 +435,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
binlogs = append(binlogs, sbl)
}
dresp, err := s.masterClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{
dresp, err := s.rootCoordClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
SourceID: Params.NodeID,

View File

@ -121,35 +121,35 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}
type mockMasterService struct {
type mockRootCoordService struct {
cnt int64
}
func newMockMasterService() *mockMasterService {
return &mockMasterService{}
func newMockRootCoordService() *mockRootCoordService {
return &mockRootCoordService{}
}
func (m *mockMasterService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (m *mockRootCoordService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return nil, nil
}
func (m *mockMasterService) Init() error {
func (m *mockRootCoordService) Init() error {
return nil
}
func (m *mockMasterService) Start() error {
func (m *mockRootCoordService) Start() error {
return nil
}
func (m *mockMasterService) Stop() error {
func (m *mockRootCoordService) Stop() error {
return nil
}
func (m *mockMasterService) Register() error {
func (m *mockRootCoordService) Register() error {
return nil
}
func (m *mockMasterService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
func (m *mockRootCoordService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
NodeID: 0,
@ -165,24 +165,24 @@ func (m *mockMasterService) GetComponentStates(ctx context.Context) (*internalpb
}, nil
}
func (m *mockMasterService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (m *mockRootCoordService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
panic("not implemented") // TODO: Implement
}
//DDL request
func (m *mockMasterService) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
func (m *mockRootCoordService) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
func (m *mockRootCoordService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -196,7 +196,7 @@ func (m *mockMasterService) DescribeCollection(ctx context.Context, req *milvusp
}, nil
}
func (m *mockMasterService) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
func (m *mockRootCoordService) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
return &milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -206,19 +206,19 @@ func (m *mockMasterService) ShowCollections(ctx context.Context, req *milvuspb.S
}, nil
}
func (m *mockMasterService) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
func (m *mockRootCoordService) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
func (m *mockRootCoordService) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -230,20 +230,20 @@ func (m *mockMasterService) ShowPartitions(ctx context.Context, req *milvuspb.Sh
}
//index builder service
func (m *mockMasterService) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
func (m *mockRootCoordService) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
//global timestamp allocator
func (m *mockMasterService) AllocTimestamp(ctx context.Context, req *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
func (m *mockRootCoordService) AllocTimestamp(ctx context.Context, req *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
val := atomic.AddInt64(&m.cnt, int64(req.Count))
phy := time.Now().UnixNano() / int64(time.Millisecond)
ts := tsoutil.ComposeTS(phy, val)
@ -257,7 +257,7 @@ func (m *mockMasterService) AllocTimestamp(ctx context.Context, req *masterpb.Al
}, nil
}
func (m *mockMasterService) AllocID(ctx context.Context, req *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
func (m *mockRootCoordService) AllocID(ctx context.Context, req *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
val := atomic.AddInt64(&m.cnt, int64(req.Count))
return &masterpb.AllocIDResponse{
Status: &commonpb.Status{
@ -270,15 +270,15 @@ func (m *mockMasterService) AllocID(ctx context.Context, req *masterpb.AllocIDRe
}
//segment
func (m *mockMasterService) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
func (m *mockRootCoordService) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
func (m *mockRootCoordService) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (m *mockRootCoordService) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -288,11 +288,11 @@ func (m *mockMasterService) GetDdChannel(ctx context.Context) (*milvuspb.StringR
}, nil
}
func (m *mockMasterService) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
func (m *mockRootCoordService) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMasterService) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
func (m *mockRootCoordService) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}

View File

@ -37,7 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const masterClientTimout = 20 * time.Second
const rootCoordClientTimout = 20 * time.Second
type (
UniqueID = typeutil.UniqueID
@ -56,7 +56,7 @@ type Server struct {
segmentManager Manager
allocator allocator
cluster *cluster
masterClient types.MasterService
rootCoordClient types.RootCoord
ddChannelName string
flushCh chan UniqueID
@ -67,8 +67,8 @@ type Server struct {
activeCh <-chan bool
eventCh <-chan *sessionutil.SessionEvent
dataClientCreator func(addr string) (types.DataNode, error)
masterClientCreator func(addr string) (types.MasterService, error)
dataClientCreator func(addr string) (types.DataNode, error)
rootCoordClientCreator func(addr string) (types.RootCoord, error)
}
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -81,8 +81,8 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
s.dataClientCreator = func(addr string) (types.DataNode, error) {
return datanodeclient.NewClient(addr, 3*time.Second)
}
s.masterClientCreator = func(addr string) (types.MasterService, error) {
return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout)
s.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout)
}
return s, nil
@ -111,7 +111,7 @@ func (s *Server) Start() error {
if err != nil {
return err
}
if err = s.initMasterClient(); err != nil {
if err = s.initRootCoordClient(); err != nil {
return err
}
@ -127,7 +127,7 @@ func (s *Server) Start() error {
return err
}
s.allocator = newAllocator(s.masterClient)
s.allocator = newAllocator(s.rootCoordClient)
s.startSegmentManager()
if err = s.initFlushMsgStream(); err != nil {
@ -173,7 +173,7 @@ func (s *Server) initServiceDiscovery() error {
}
if err := s.cluster.startup(datanodes); err != nil {
log.Debug("DataCoord loadMetaFromMaster failed", zap.Error(err))
log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
return err
}
@ -436,16 +436,16 @@ func (s *Server) handleFlushingSegments(ctx context.Context) {
}
}
func (s *Server) initMasterClient() error {
func (s *Server) initRootCoordClient() error {
var err error
s.masterClient, err = s.masterClientCreator("")
s.rootCoordClient, err = s.rootCoordClientCreator("")
if err != nil {
return err
}
if err = s.masterClient.Init(); err != nil {
if err = s.rootCoordClient.Init(); err != nil {
return err
}
return s.masterClient.Start()
return s.rootCoordClient.Start()
}
func (s *Server) Stop() error {
@ -487,8 +487,8 @@ func (s *Server) stopServerLoop() {
// return fmt.Errorf("can not find channel %s", channelName)
//}
func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int64) error {
resp, err := s.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
SourceID: Params.NodeID,
@ -499,7 +499,7 @@ func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int6
if err = VerifyResponse(resp, err); err != nil {
return err
}
presp, err := s.masterClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: -1, // todo

View File

@ -687,8 +687,8 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.masterClientCreator = func(addr string) (types.MasterService, error) {
return newMockMasterService(), nil
svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
t.Run("test get recovery info with no segments", func(t *testing.T) {
@ -827,8 +827,8 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
svr.dataClientCreator = func(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.masterClientCreator = func(addr string) (types.MasterService, error) {
return newMockMasterService(), nil
svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
assert.Nil(t, err)
err = svr.Register()

View File

@ -29,23 +29,23 @@ import (
"github.com/stretchr/testify/assert"
)
type mockMaster struct {
types.MasterService
type mockRootCoord struct {
types.RootCoord
}
func (m *mockMaster) Init() error {
func (m *mockRootCoord) Init() error {
return nil
}
func (m *mockMaster) Start() error {
func (m *mockRootCoord) Start() error {
return nil
}
func (m *mockMaster) Stop() error {
func (m *mockRootCoord) Stop() error {
return fmt.Errorf("stop error")
}
func (m *mockMaster) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
func (m *mockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Healthy,
@ -61,7 +61,7 @@ func (m *mockMaster) GetComponentStates(ctx context.Context) (*internalpb.Compon
}, nil
}
func (m *mockMaster) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
func (m *mockRootCoord) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
return &milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -122,8 +122,8 @@ func TestRun(t *testing.T) {
Params.Init()
dnServer.newMasterServiceClient = func() (types.MasterService, error) {
return &mockMaster{}, nil
dnServer.newRootCoordClient = func() (types.RootCoord, error) {
return &mockRootCoord{}, nil
}
dnServer.newDataServiceClient = func(string, []string, time.Duration) types.DataService {
return &mockDataService{}

View File

@ -51,11 +51,11 @@ type Server struct {
msFactory msgstream.Factory
masterService types.MasterService
dataService types.DataService
rootCoord types.RootCoord
dataService types.DataService
newMasterServiceClient func() (types.MasterService, error)
newDataServiceClient func(string, []string, time.Duration) types.DataService
newRootCoordClient func() (types.RootCoord, error)
newDataServiceClient func(string, []string, time.Duration) types.DataService
closer io.Closer
}
@ -68,7 +68,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
cancel: cancel,
msFactory: factory,
grpcErrChan: make(chan error),
newMasterServiceClient: func() (types.MasterService, error) {
newRootCoordClient: func() (types.RootCoord, error) {
return rcc.NewClient(ctx1, dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second)
},
newDataServiceClient: func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService {
@ -113,8 +113,8 @@ func (s *Server) startGrpcLoop(listener net.Listener) {
}
func (s *Server) SetMasterServiceInterface(ms types.MasterService) error {
return s.datanode.SetMasterServiceInterface(ms)
func (s *Server) SetRootCoordInterface(ms types.RootCoord) error {
return s.datanode.SetRootCoordInterface(ms)
}
func (s *Server) SetDataServiceInterface(ds types.DataService) error {
@ -174,29 +174,29 @@ func (s *Server) init() error {
}
// --- Master Server Client ---
if s.newMasterServiceClient != nil {
log.Debug("Master service address", zap.String("address", Params.MasterAddress))
log.Debug("Init master service client ...")
masterServiceClient, err := s.newMasterServiceClient()
if s.newRootCoordClient != nil {
log.Debug("RootCoord address", zap.String("address", Params.MasterAddress))
log.Debug("Init root coord client ...")
rootCoordClient, err := s.newRootCoordClient()
if err != nil {
log.Debug("DataNode newMasterServiceClient failed", zap.Error(err))
log.Debug("DataNode newRootCoordClient failed", zap.Error(err))
panic(err)
}
if err = masterServiceClient.Init(); err != nil {
log.Debug("DataNode masterServiceClient Init failed", zap.Error(err))
if err = rootCoordClient.Init(); err != nil {
log.Debug("DataNode rootCoordClient Init failed", zap.Error(err))
panic(err)
}
if err = masterServiceClient.Start(); err != nil {
log.Debug("DataNode masterServiceClient Start failed", zap.Error(err))
if err = rootCoordClient.Start(); err != nil {
log.Debug("DataNode rootCoordClient Start failed", zap.Error(err))
panic(err)
}
err = funcutil.WaitForComponentHealthy(ctx, masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentHealthy(ctx, rootCoordClient, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("DataNode wait masterService ready failed", zap.Error(err))
log.Debug("DataNode wait rootCoord ready failed", zap.Error(err))
panic(err)
}
log.Debug("DataNode masterService is ready")
if err = s.SetMasterServiceInterface(masterServiceClient); err != nil {
log.Debug("DataNode rootCoord is ready")
if err = s.SetRootCoordInterface(rootCoordClient); err != nil {
panic(err)
}
}

View File

@ -26,23 +26,23 @@ import (
"github.com/stretchr/testify/assert"
)
type mockMaster struct {
types.MasterService
type mockRootCoord struct {
types.RootCoord
}
func (m *mockMaster) Init() error {
func (m *mockRootCoord) Init() error {
return nil
}
func (m *mockMaster) Start() error {
func (m *mockRootCoord) Start() error {
return nil
}
func (m *mockMaster) Stop() error {
func (m *mockRootCoord) Stop() error {
return fmt.Errorf("stop error")
}
func (m *mockMaster) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
func (m *mockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Healthy,
@ -58,7 +58,7 @@ func (m *mockMaster) GetComponentStates(ctx context.Context) (*internalpb.Compon
}, nil
}
func (m *mockMaster) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (m *mockRootCoord) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -68,7 +68,7 @@ func (m *mockMaster) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse
}, nil
}
func (m *mockMaster) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
func (m *mockRootCoord) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
return &milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -48,8 +48,8 @@ type Server struct {
grpcErrChan chan error
wg sync.WaitGroup
grpcServer *grpc.Server
masterService types.MasterService
grpcServer *grpc.Server
rootCoord types.RootCoord
closer io.Closer
}

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexserviceclient
package grpcindexcoordclient
import (
"context"
@ -47,18 +47,18 @@ type Client struct {
reconnTry int
}
func getIndexServiceaddr(sess *sessionutil.Session) (string, error) {
key := typeutil.IndexServiceRole
func getIndexCoordAddr(sess *sessionutil.Session) (string, error) {
key := typeutil.IndexCoordRole
msess, _, err := sess.GetSessions(key)
if err != nil {
log.Debug("IndexServiceClient GetSessions failed", zap.Any("key", key), zap.Error(err))
log.Debug("IndexCoordClient GetSessions failed", zap.Any("key", key), zap.Error(err))
return "", err
}
log.Debug("IndexServiceClient GetSessions success", zap.Any("key", key), zap.Any("msess", msess))
log.Debug("IndexCoordClient GetSessions success", zap.Any("key", key), zap.Any("msess", msess))
ms, ok := msess[key]
if !ok {
log.Debug("IndexServiceClient msess key not existed", zap.Any("key", key), zap.Any("len of msess", len(msess)))
return "", fmt.Errorf("number of indexservice is incorrect, %d", len(msess))
log.Debug("IndexCoordClient msess key not existed", zap.Any("key", key), zap.Any("len of msess", len(msess)))
return "", fmt.Errorf("number of indexcoord is incorrect, %d", len(msess))
}
return ms.Address, nil
}
@ -85,24 +85,24 @@ func (c *Client) Init() error {
func (c *Client) connect() error {
var err error
getIndexServiceaddrFn := func() error {
c.addr, err = getIndexServiceaddr(c.sess)
getIndexCoordaddrFn := func() error {
c.addr, err = getIndexCoordAddr(c.sess)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getIndexServiceaddrFn)
err = retry.Retry(c.reconnTry, 3*time.Second, getIndexCoordaddrFn)
if err != nil {
log.Debug("IndexServiceClient getIndexServiceAddress failed", zap.Error(err))
log.Debug("IndexCoordClient getIndexCoordAddress failed", zap.Error(err))
return err
}
log.Debug("IndexServiceClient getIndexServiceAddress success")
log.Debug("IndexCoordClient getIndexCoordAddress success")
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("IndexServiceClient try connect ", zap.String("address", c.addr))
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
@ -117,10 +117,10 @@ func (c *Client) connect() error {
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
log.Debug("IndexServiceClient try connect failed", zap.Error(err))
log.Debug("IndexCoordClient try connect failed", zap.Error(err))
return err
}
log.Debug("IndexServiceClient connect success")
log.Debug("IndexCoordClient connect success")
c.grpcClient = indexpb.NewIndexServiceClient(c.conn)
return nil
}

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexservice
package grpcindexcoord
import (
"sync"

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexservice
package grpcindexcoord
import (
"context"
@ -20,9 +20,10 @@ import (
"sync"
"go.uber.org/zap"
"google.golang.org/grpc"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/indexservice"
ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/indexcoord"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -31,14 +32,13 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Server struct {
indexservice *indexservice.IndexService
indexcoord *indexcoord.IndexCoord
grpcServer *grpc.Server
grpcErrChan chan error
@ -64,36 +64,36 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
indexservice.Params.Init()
indexservice.Params.Address = Params.ServiceAddress
indexservice.Params.Port = Params.ServicePort
indexcoord.Params.Init()
indexcoord.Params.Address = Params.ServiceAddress
indexcoord.Params.Port = Params.ServicePort
closer := trace.InitTracing("index_service")
closer := trace.InitTracing("index_coord")
s.closer = closer
if err := s.indexservice.Register(); err != nil {
if err := s.indexcoord.Register(); err != nil {
return err
}
s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
// wait for grpc IndexService loop start
// wait for grpc IndexCoord loop start
if err := <-s.grpcErrChan; err != nil {
return err
}
s.indexservice.UpdateStateCode(internalpb.StateCode_Initializing)
s.indexcoord.UpdateStateCode(internalpb.StateCode_Initializing)
if err := s.indexservice.Init(); err != nil {
if err := s.indexcoord.Init(); err != nil {
return err
}
return nil
}
func (s *Server) start() error {
if err := s.indexservice.Start(); err != nil {
if err := s.indexcoord.Start(); err != nil {
return err
}
log.Debug("indexService started")
log.Debug("indexCoord started")
return nil
}
@ -103,8 +103,8 @@ func (s *Server) Stop() error {
return err
}
}
if s.indexservice != nil {
s.indexservice.Stop()
if s.indexcoord != nil {
s.indexcoord.Stop()
}
s.loopCancel()
@ -117,45 +117,45 @@ func (s *Server) Stop() error {
}
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
return s.indexservice.GetComponentStates(ctx)
return s.indexcoord.GetComponentStates(ctx)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
return s.indexservice.GetTimeTickChannel(ctx)
return s.indexcoord.GetTimeTickChannel(ctx)
}
func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
return s.indexservice.GetStatisticsChannel(ctx)
return s.indexcoord.GetStatisticsChannel(ctx)
}
func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
return s.indexservice.RegisterNode(ctx, req)
return s.indexcoord.RegisterNode(ctx, req)
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
return s.indexservice.BuildIndex(ctx, req)
return s.indexcoord.BuildIndex(ctx, req)
}
func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
return s.indexservice.GetIndexStates(ctx, req)
return s.indexcoord.GetIndexStates(ctx, req)
}
func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.indexservice.DropIndex(ctx, request)
return s.indexcoord.DropIndex(ctx, request)
}
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
return s.indexservice.GetIndexFilePaths(ctx, req)
return s.indexcoord.GetIndexFilePaths(ctx, req)
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done()
log.Debug("IndexService", zap.Int("network port", grpcPort))
log.Debug("IndexCoord", zap.Int("network port", grpcPort))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Warn("IndexService", zap.String("GrpcServer:failed to listen", err.Error()))
log.Warn("IndexCoord", zap.String("GrpcServer:failed to listen", err.Error()))
s.grpcErrChan <- err
return
}
@ -167,32 +167,29 @@ func (s *Server) startGrpcLoop(grpcPort int) {
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
grpc_opentracing.StreamServerInterceptor(opts...)))
grpc.UnaryInterceptor(ot.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
}
log.Debug("IndexService grpcServer loop exit")
log.Debug("IndexCoord grpcServer loop exit")
}
func NewServer(ctx context.Context) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
serverImp, err := indexservice.NewIndexService(ctx)
serverImp, err := indexcoord.NewIndexCoord(ctx)
if err != nil {
defer cancel()
return nil, err
}
s := &Server{
loopCtx: ctx1,
loopCancel: cancel,
indexservice: serverImp,
grpcErrChan: make(chan error),
loopCtx: ctx1,
loopCancel: cancel,
indexcoord: serverImp,
grpcErrChan: make(chan error),
}
return s, nil

View File

@ -24,7 +24,7 @@ import (
"go.uber.org/zap"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
grpcindexcoordclient "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -43,10 +43,10 @@ type Server struct {
grpcServer *grpc.Server
grpcErrChan chan error
indexServiceClient types.IndexService
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
indexCoordClient types.IndexCoord
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
closer io.Closer
}
@ -82,10 +82,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
grpc_opentracing.StreamServerInterceptor(opts...)))
grpc.UnaryInterceptor(grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(grpc_opentracing.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
@ -138,13 +136,13 @@ func (s *Server) init() error {
return err
}
s.indexServiceClient = grpcindexserviceclient.NewClient(indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints, 3*time.Second)
err = s.indexServiceClient.Init()
s.indexCoordClient = grpcindexcoordclient.NewClient(indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints, 3*time.Second)
err = s.indexCoordClient.Init()
if err != nil {
log.Debug("IndexNode indexSerticeClient init failed", zap.Error(err))
return err
}
s.indexnode.SetIndexServiceClient(s.indexServiceClient)
s.indexnode.SetIndexCoordClient(s.indexCoordClient)
s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))

View File

@ -25,7 +25,7 @@ import (
"google.golang.org/grpc"
grpcdataserviceclient "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
grpcindexcoordclient "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
@ -57,7 +57,7 @@ type Server struct {
rootCoordClient *rcc.GrpcClient
dataServiceClient *grpcdataserviceclient.Client
queryServiceClient *grpcqueryserviceclient.Client
indexServiceClient *grpcindexserviceclient.Client
indexCoordClient *grpcindexcoordclient.Client
tracer opentracing.Tracer
closer io.Closer
@ -185,8 +185,8 @@ func (s *Server) init() error {
log.Debug("ProxyNode WaitForComponentHealthy RootCoord failed ", zap.Error(err))
panic(err)
}
s.proxynode.SetMasterClient(s.rootCoordClient)
log.Debug("set master client ...")
s.proxynode.SetRootCoordClient(s.rootCoordClient)
log.Debug("set rootcoord client ...")
dataServiceAddr := Params.DataServiceAddress
log.Debug("ProxyNode", zap.String("data service address", dataServiceAddr))
@ -201,13 +201,13 @@ func (s *Server) init() error {
indexServiceAddr := Params.IndexServerAddress
log.Debug("ProxyNode", zap.String("index server address", indexServiceAddr))
s.indexServiceClient = grpcindexserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout)
err = s.indexServiceClient.Init()
s.indexCoordClient = grpcindexcoordclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout)
err = s.indexCoordClient.Init()
if err != nil {
log.Debug("ProxyNode indexServiceClient init failed ", zap.Error(err))
log.Debug("ProxyNode indexCoordClient init failed ", zap.Error(err))
return err
}
s.proxynode.SetIndexServiceClient(s.indexServiceClient)
s.proxynode.SetIndexCoordClient(s.indexCoordClient)
log.Debug("set index service client ...")
queryServiceAddr := Params.QueryServiceAddress

View File

@ -156,11 +156,11 @@ func (data *DataServiceMock) GetSegmentStates(req *datapb.GetSegmentStatesReques
}, nil
}
type IndexServiceMock struct {
type IndexCoordMock struct {
Count int
}
func (index *IndexServiceMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
func (index *IndexCoordMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
if index.Count < 30 {
index.Count++
return nil, errors.New("index path not exist")

View File

@ -30,7 +30,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/log"
@ -56,10 +56,10 @@ type Server struct {
grpcServer *grpc.Server
dataService *dsc.Client
masterService *rcc.GrpcClient
indexService *isc.Client
queryService *qsc.Client
dataService *dsc.Client
rootCoord *rcc.GrpcClient
indexCoord *isc.Client
queryService *qsc.Client
closer io.Closer
}
@ -163,29 +163,29 @@ func (s *Server) init() error {
panic(err)
}
// --- IndexService ---
log.Debug("Index service", zap.String("address", Params.IndexServiceAddress))
indexService := isc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
// --- IndexCoord ---
log.Debug("Index coord", zap.String("address", Params.IndexServiceAddress))
indexCoord := isc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
if err := indexService.Init(); err != nil {
log.Debug("QueryNode IndexServiceClient Init failed", zap.Error(err))
if err := indexCoord.Init(); err != nil {
log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err))
panic(err)
}
if err := indexService.Start(); err != nil {
log.Debug("QueryNode IndexServiceClient Start failed", zap.Error(err))
if err := indexCoord.Start(); err != nil {
log.Debug("QueryNode IndexCoordClient Start failed", zap.Error(err))
panic(err)
}
// wait IndexService healthy
log.Debug("QueryNode start to wait for IndexService ready")
err = funcutil.WaitForComponentHealthy(s.ctx, indexService, "IndexService", 1000000, time.Millisecond*200)
// wait IndexCoord healthy
log.Debug("QueryNode start to wait for IndexCoord ready")
err = funcutil.WaitForComponentHealthy(s.ctx, indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for IndexService ready failed", zap.Error(err))
log.Debug("QueryNode wait for IndexCoord ready failed", zap.Error(err))
panic(err)
}
log.Debug("QueryNode report IndexService is ready")
log.Debug("QueryNode report IndexCoord is ready")
if err := s.SetIndexService(indexService); err != nil {
if err := s.SetIndexCoord(indexCoord); err != nil {
panic(err)
}
@ -302,16 +302,16 @@ func (s *Server) Stop() error {
return nil
}
func (s *Server) SetMasterService(masterService types.MasterService) error {
return s.querynode.SetMasterService(masterService)
func (s *Server) SetMasterService(rootCoord types.RootCoord) error {
return s.querynode.SetRootCoord(rootCoord)
}
func (s *Server) SetQueryService(queryService types.QueryService) error {
return s.querynode.SetQueryService(queryService)
}
func (s *Server) SetIndexService(indexService types.IndexService) error {
return s.querynode.SetIndexService(indexService)
func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
return s.querynode.SetIndexCoord(indexCoord)
}
func (s *Server) SetDataService(dataService types.DataService) error {

View File

@ -130,7 +130,7 @@ func (s *Server) init() error {
panic(err)
}
if err := s.SetMasterService(rootCoord); err != nil {
if err := s.SetRootCoord(rootCoord); err != nil {
panic(err)
}
log.Debug("QueryService report RootCoord ready")
@ -215,8 +215,8 @@ func (s *Server) Stop() error {
return err
}
func (s *Server) SetMasterService(m types.MasterService) error {
s.queryservice.SetMasterService(m)
func (s *Server) SetRootCoord(m types.RootCoord) error {
s.queryservice.SetRootCoord(m)
return nil
}

View File

@ -25,7 +25,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
pnc "github.com/milvus-io/milvus/internal/distributed/proxynode/client"
qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
"github.com/milvus-io/milvus/internal/log"
@ -44,7 +44,7 @@ import (
// Server grpc wrapper
type Server struct {
rootCoord types.MasterComponent
rootCoord types.RootCoordComponent
grpcServer *grpc.Server
grpcErrChan chan error
@ -54,10 +54,10 @@ type Server struct {
cancel context.CancelFunc
dataCoord types.DataService
indexCoord types.IndexService
indexCoord types.IndexCoord
queryCoord types.QueryService
newIndexCoordClient func(string, []string, time.Duration) types.IndexService
newIndexCoordClient func(string, []string, time.Duration) types.IndexCoord
newDataCoordClient func(string, []string, time.Duration) types.DataService
newQueryCoordClient func(string, []string, time.Duration) types.QueryService
@ -96,7 +96,7 @@ func (s *Server) setClient() {
}
return dsClient
}
s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexService {
s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexCoord {
isClient := isc.NewClient(metaRootPath, etcdEndpoints, timeout)
if err := isClient.Init(); err != nil {
panic(err)

View File

@ -815,7 +815,7 @@ func TestGrpcService(t *testing.T) {
}
type mockCore struct {
types.MasterComponent
types.RootCoordComponent
}
func (m *mockCore) UpdateStateCode(internalpb.StateCode) {
@ -824,7 +824,7 @@ func (m *mockCore) UpdateStateCode(internalpb.StateCode) {
func (m *mockCore) SetDataCoord(context.Context, types.DataService) error {
return nil
}
func (m *mockCore) SetIndexCoord(types.IndexService) error {
func (m *mockCore) SetIndexCoord(types.IndexCoord) error {
return nil
}
@ -881,7 +881,7 @@ func (m *mockDataCoord) Stop() error {
}
type mockIndex struct {
types.IndexService
types.IndexCoord
}
func (m *mockIndex) Init() error {
@ -925,7 +925,7 @@ func TestRun(t *testing.T) {
svr.newDataCoordClient = func(string, []string, time.Duration) types.DataService {
return &mockDataCoord{}
}
svr.newIndexCoordClient = func(string, []string, time.Duration) types.IndexService {
svr.newIndexCoordClient = func(string, []string, time.Duration) types.IndexCoord {
return &mockIndex{}
}
svr.newQueryCoordClient = func(string, []string, time.Duration) types.QueryService {

View File

@ -10,5 +10,5 @@ approvers:
- scsven
labels:
- component/indexservice
- component/indexcoord

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"context"
@ -48,7 +48,7 @@ const (
recycleIndexLimit = 20
)
type IndexService struct {
type IndexCoord struct {
nodeClients *PriorityQueue
nodeStates map[UniqueID]*internalpb.ComponentStates
stateCode atomic.Value
@ -84,10 +84,10 @@ type IndexService struct {
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
func NewIndexService(ctx context.Context) (*IndexService, error) {
func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
rand.Seed(time.Now().UnixNano())
ctx1, cancel := context.WithCancel(ctx)
i := &IndexService{
i := &IndexCoord{
loopCtx: ctx1,
loopCancel: cancel,
nodeClients: &PriorityQueue{},
@ -98,15 +98,15 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
}
// Register register index service at etcd
func (i *IndexService) Register() error {
func (i *IndexCoord) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
i.session.Init(typeutil.IndexServiceRole, Params.Address, true)
i.session.Init(typeutil.IndexCoordRole, Params.Address, true)
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0)
return nil
}
func (i *IndexService) Init() error {
log.Debug("IndexService", zap.Any("etcd endpoints", Params.EtcdEndpoints))
func (i *IndexCoord) Init() error {
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.assignChan = make(chan []UniqueID, 1024)
connectEtcdFn := func() error {
@ -122,19 +122,19 @@ func (i *IndexService) Init() error {
i.metaTable = metakv
return err
}
log.Debug("IndexService try to connect etcd")
log.Debug("IndexCoord try to connect etcd")
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
log.Debug("IndexService try to connect etcd failed", zap.Error(err))
log.Debug("IndexCoord try to connect etcd failed", zap.Error(err))
return err
}
log.Debug("IndexService try to connect etcd success")
log.Debug("IndexCoord try to connect etcd success")
//init idAllocator
kvRootPath := Params.KvRootPath
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid"))
if err := i.idAllocator.Initialize(); err != nil {
log.Debug("IndexService idAllocator initialize failed", zap.Error(err))
log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err))
return err
}
@ -154,32 +154,32 @@ func (i *IndexService) Init() error {
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Debug("IndexService new minio kv failed", zap.Error(err))
log.Debug("IndexCoord new minio kv failed", zap.Error(err))
return err
}
log.Debug("IndexService new minio kv success")
log.Debug("IndexCoord new minio kv success")
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable)
if err != nil {
log.Debug("IndexService new task scheduler failed", zap.Error(err))
log.Debug("IndexCoord new task scheduler failed", zap.Error(err))
return err
}
log.Debug("IndexService new task scheduler success")
log.Debug("IndexCoord new task scheduler success")
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexService", zap.Any("State", i.stateCode.Load()))
log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load()))
i.nodeTasks = NewNodeTasks()
err = i.assignTasksServerStart()
if err != nil {
log.Debug("IndexService assign tasks server start failed", zap.Error(err))
log.Debug("IndexCoord assign tasks server start failed", zap.Error(err))
return err
}
log.Debug("IndexService assign tasks server success", zap.Error(err))
log.Debug("IndexCoord assign tasks server success", zap.Error(err))
return nil
}
func (i *IndexService) Start() error {
func (i *IndexCoord) Start() error {
i.loopWg.Add(1)
go i.tsLoop()
@ -200,12 +200,12 @@ func (i *IndexService) Start() error {
for _, cb := range i.startCallbacks {
cb()
}
log.Debug("IndexService start")
log.Debug("IndexCoord start")
return nil
}
func (i *IndexService) Stop() error {
func (i *IndexCoord) Stop() error {
i.loopCancel()
i.sched.Close()
for _, cb := range i.closeCallbacks {
@ -214,15 +214,15 @@ func (i *IndexService) Stop() error {
return nil
}
func (i *IndexService) UpdateStateCode(code internalpb.StateCode) {
func (i *IndexCoord) UpdateStateCode(code internalpb.StateCode) {
i.stateCode.Store(code)
}
func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
log.Debug("get IndexService component states ...")
func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
log.Debug("get IndexCoord component states ...")
stateInfo := &internalpb.ComponentInfo{
NodeID: i.ID,
Role: "IndexService",
Role: "IndexCoord",
StateCode: i.stateCode.Load().(internalpb.StateCode),
}
@ -236,8 +236,8 @@ func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb.Comp
return ret, nil
}
func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexService time tick channel ...")
func (i *IndexCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexCoord time tick channel ...")
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -247,8 +247,8 @@ func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.String
}, nil
}
func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexService statistics channel ...")
func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexCoord statistics channel ...")
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -258,8 +258,8 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
}, nil
}
func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
log.Debug("IndexService building index ...",
func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
log.Debug("IndexCoord building index ...",
zap.Int64("IndexBuildID", req.IndexBuildID),
zap.String("IndexName = ", req.IndexName),
zap.Int64("IndexID = ", req.IndexID),
@ -268,7 +268,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe
zap.Any("IndexParams", req.IndexParams))
hasIndex, indexBuildID := i.metaTable.HasSameReq(req)
if hasIndex {
log.Debug("IndexService", zap.Any("hasIndex true", indexBuildID))
log.Debug("IndexCoord", zap.Any("hasIndex true", indexBuildID))
return &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -318,7 +318,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe
ret.Status.Reason = err.Error()
return ret, nil
}
log.Debug("IndexService BuildIndex Enqueue successfully", zap.Any("IndexBuildID", indexBuildID))
log.Debug("IndexCoord BuildIndex Enqueue successfully", zap.Any("IndexBuildID", indexBuildID))
err = t.WaitToFinish()
if err != nil {
@ -332,8 +332,8 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe
return ret, nil
}
func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
log.Debug("IndexService get index states ...", zap.Int64s("IndexBuildIDs", req.IndexBuildIDs))
func (i *IndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
log.Debug("IndexCoord get index states ...", zap.Int64s("IndexBuildIDs", req.IndexBuildIDs))
var indexStates []*indexpb.IndexInfo
for _, indexID := range req.IndexBuildIDs {
indexState, err := i.metaTable.GetIndexState(indexID)
@ -348,15 +348,15 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex
},
States: indexStates,
}
log.Debug("IndexService get index states success",
log.Debug("IndexCoord get index states success",
zap.Any("index status", ret.Status),
zap.Any("index states", ret.States))
return ret, nil
}
func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
log.Debug("IndexService DropIndex", zap.Any("IndexID", req.IndexID))
func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
log.Debug("IndexCoord DropIndex", zap.Any("IndexID", req.IndexID))
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -377,12 +377,12 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ
}()
}()
log.Debug("IndexService DropIndex success", zap.Any("IndexID", req.IndexID))
log.Debug("IndexCoord DropIndex success", zap.Any("IndexID", req.IndexID))
return ret, nil
}
func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
log.Debug("IndexService GetIndexFilePaths", zap.Int64s("IndexBuildIds", req.IndexBuildIDs))
func (i *IndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
log.Debug("IndexCoord GetIndexFilePaths", zap.Int64s("IndexBuildIds", req.IndexBuildIDs))
var indexPaths []*indexpb.IndexFilePathInfo = nil
for _, indexID := range req.IndexBuildIDs {
@ -392,7 +392,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
}
indexPaths = append(indexPaths, indexPathInfo)
}
log.Debug("IndexService GetIndexFilePaths success")
log.Debug("IndexCoord GetIndexFilePaths success")
ret := &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
@ -400,12 +400,12 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
},
FilePaths: indexPaths,
}
log.Debug("IndexService GetIndexFilePaths ", zap.Any("FilePaths", ret.FilePaths))
log.Debug("IndexCoord GetIndexFilePaths ", zap.Any("FilePaths", ret.FilePaths))
return ret, nil
}
func (i *IndexService) tsLoop() {
func (i *IndexCoord) tsLoop() {
tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(i.loopCtx)
@ -415,25 +415,25 @@ func (i *IndexService) tsLoop() {
select {
case <-tsoTicker.C:
if err := i.idAllocator.UpdateID(); err != nil {
log.Debug("IndexService tsLoop UpdateID failed", zap.Error(err))
log.Debug("IndexCoord tsLoop UpdateID failed", zap.Error(err))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Debug("IndexService tsLoop is closed")
log.Debug("IndexCoord tsLoop is closed")
return
}
}
}
func (i *IndexService) recycleUnusedIndexFiles() {
func (i *IndexCoord) recycleUnusedIndexFiles() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
timeTicker := time.NewTicker(durationInterval)
log.Debug("IndexService start recycleUnusedIndexFiles loop")
log.Debug("IndexCoord start recycleUnusedIndexFiles loop")
for {
select {
@ -445,7 +445,7 @@ func (i *IndexService) recycleUnusedIndexFiles() {
if meta.indexMeta.MarkDeleted {
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID))
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed",
log.Debug("IndexCoord recycleUnusedIndexFiles Remove index files failed",
zap.Any("MarkDeleted", true), zap.Error(err))
}
i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID)
@ -453,12 +453,12 @@ func (i *IndexService) recycleUnusedIndexFiles() {
for j := 1; j < int(meta.indexMeta.Version); j++ {
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j)
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed",
log.Debug("IndexCoord recycleUnusedIndexFiles Remove index files failed",
zap.Any("MarkDeleted", false), zap.Error(err))
}
}
if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil {
log.Debug("IndexService recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err))
log.Debug("IndexCoord recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err))
}
}
}
@ -466,13 +466,13 @@ func (i *IndexService) recycleUnusedIndexFiles() {
}
}
func (i *IndexService) assignmentTasksLoop() {
func (i *IndexCoord) assignmentTasksLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService start assignmentTasksLoop start")
log.Debug("IndexCoord start assignmentTasksLoop start")
for {
select {
@ -481,16 +481,16 @@ func (i *IndexService) assignmentTasksLoop() {
case indexBuildIDs := <-i.assignChan:
for _, indexBuildID := range indexBuildIDs {
meta := i.metaTable.GetIndexMeta(indexBuildID)
log.Debug("IndexService assignmentTasksLoop ", zap.Any("Meta", meta))
log.Debug("IndexCoord assignmentTasksLoop ", zap.Any("Meta", meta))
if meta.indexMeta.State == commonpb.IndexState_Finished {
continue
}
if err := i.metaTable.UpdateVersion(indexBuildID); err != nil {
log.Debug("IndexService assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
}
nodeID, builderClient := i.nodeClients.PeekClient()
if builderClient == nil {
log.Debug("IndexService assignmentTasksLoop can not find available IndexNode")
log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode")
i.assignChan <- []UniqueID{indexBuildID}
continue
}
@ -507,15 +507,15 @@ func (i *IndexService) assignmentTasksLoop() {
}
resp, err := builderClient.CreateIndex(ctx, req)
if err != nil {
log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
continue
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
continue
}
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
log.Debug("IndexService assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
}
i.nodeClients.IncPriority(nodeID, 1)
}
@ -523,12 +523,12 @@ func (i *IndexService) assignmentTasksLoop() {
}
}
func (i *IndexService) watchNodeLoop() {
func (i *IndexCoord) watchNodeLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService watchNodeLoop start")
log.Debug("IndexCoord watchNodeLoop start")
for {
select {
@ -538,11 +538,11 @@ func (i *IndexService) watchNodeLoop() {
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Debug("IndexService watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID))
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID))
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
i.removeNode(serverID)
log.Debug("IndexService watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID))
log.Debug("IndexCoord watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID))
indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID)
log.Debug("IndexNode crashed", zap.Any("IndexNode ID", serverID), zap.Any("task IDs", indexBuildIDs))
i.assignChan <- indexBuildIDs
@ -552,12 +552,12 @@ func (i *IndexService) watchNodeLoop() {
}
}
func (i *IndexService) watchMetaLoop() {
func (i *IndexCoord) watchMetaLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService watchMetaLoop start")
log.Debug("IndexCoord watchMetaLoop start")
watchChan := i.metaTable.client.WatchWithPrefix("indexes")
@ -566,19 +566,19 @@ func (i *IndexService) watchMetaLoop() {
case <-ctx.Done():
return
case resp := <-watchChan:
log.Debug("IndexService watchMetaLoop find meta updated.")
log.Debug("IndexCoord watchMetaLoop find meta updated.")
for _, event := range resp.Events {
eventRevision := event.Kv.Version
indexMeta := &indexpb.IndexMeta{}
err := proto.UnmarshalText(string(event.Kv.Value), indexMeta)
indexBuildID := indexMeta.IndexBuildID
log.Debug("IndexService watchMetaLoop", zap.Any("event.Key", event.Kv.Key),
log.Debug("IndexCoord watchMetaLoop", zap.Any("event.Key", event.Kv.Key),
zap.Any("event.V", indexMeta), zap.Any("IndexBuildID", indexBuildID), zap.Error(err))
switch event.Type {
case mvccpb.PUT:
//TODO: get indexBuildID fast
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
log.Debug("IndexService watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload))
log.Debug("IndexCoord watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload))
if reload {
i.nodeTasks.finishTask(indexBuildID)
}
@ -589,7 +589,7 @@ func (i *IndexService) watchMetaLoop() {
}
}
func (i *IndexService) assignTasksServerStart() error {
func (i *IndexCoord) assignTasksServerStart() error {
sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole)
if err != nil {
return err
@ -610,7 +610,7 @@ func (i *IndexService) assignTasksServerStart() error {
NodeID: session.ServerID,
}
if err = i.addNode(session.ServerID, req); err != nil {
log.Debug("IndexService", zap.Any("IndexService start find node fatal, err = ", err))
log.Debug("IndexCoord", zap.Any("IndexCoord start find node fatal, err = ", err))
}
}
var serverIDs []int64

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"fmt"
@ -56,7 +56,7 @@ func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) {
func (mt *metaTable) reloadFromKV() error {
mt.indexBuildID2Meta = make(map[UniqueID]Meta)
key := "indexes"
log.Debug("IndexService metaTable LoadWithPrefix ", zap.String("prefix", key))
log.Debug("IndexCoord metaTable LoadWithPrefix ", zap.String("prefix", key))
_, values, versions, err := mt.client.LoadWithPrefix2(key)
if err != nil {
@ -67,7 +67,7 @@ func (mt *metaTable) reloadFromKV() error {
indexMeta := indexpb.IndexMeta{}
err = proto.UnmarshalText(values[i], &indexMeta)
if err != nil {
return fmt.Errorf("IndexService metaTable reloadFromKV UnmarshalText indexpb.IndexMeta err:%w", err)
return fmt.Errorf("IndexCoord metaTable reloadFromKV UnmarshalText indexpb.IndexMeta err:%w", err)
}
meta := &Meta{
@ -85,13 +85,13 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error {
key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10)
err := mt.client.CompareVersionAndSwap(key, meta.revision, value)
log.Debug("IndexService metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err))
log.Debug("IndexCoord metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err))
if err != nil {
return err
}
meta.revision = meta.revision + 1
mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta
log.Debug("IndexService metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision))
log.Debug("IndexCoord metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision))
return nil
}
@ -100,7 +100,7 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
_, values, version, err := mt.client.LoadWithPrefix2(key)
log.Debug("IndexService reloadMeta mt.client.LoadWithPrefix2", zap.Any("indexBuildID", indexBuildID), zap.Error(err))
log.Debug("IndexCoord reloadMeta mt.client.LoadWithPrefix2", zap.Any("indexBuildID", indexBuildID), zap.Error(err))
if err != nil {
return nil, err
}
@ -124,7 +124,7 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.indexBuildID2Meta[indexBuildID]
log.Debug("IndexService metaTable AddIndex", zap.Any(" index already exist", ok))
log.Debug("IndexCoord metaTable AddIndex", zap.Any(" index already exist", ok))
if ok {
return fmt.Errorf("index already exists with ID = %d", indexBuildID)
}
@ -144,11 +144,11 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ
func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService metaTable BuildIndex")
log.Debug("IndexCoord metaTable BuildIndex")
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
log.Debug("IndexService metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID))
log.Debug("IndexCoord metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID))
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
@ -181,10 +181,10 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID))
log.Debug("IndexCoord metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID))
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
log.Debug("IndexService metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID))
log.Debug("IndexCoord metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID))
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
@ -193,7 +193,7 @@ func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
//}
meta.indexMeta.Version = meta.indexMeta.Version + 1
log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID),
log.Debug("IndexCoord metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID),
zap.Any("Version", meta.indexMeta.Version))
err := mt.saveIndexMeta(&meta)
@ -218,13 +218,13 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID))
log.Debug("IndexCoord metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID))
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.Req.IndexID == indexID {
meta.indexMeta.MarkDeleted = true
if err := mt.saveIndexMeta(&meta); err != nil {
log.Debug("IndexService metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err))
log.Debug("IndexCoord metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err))
fn := func() error {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
@ -290,7 +290,7 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) {
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
err := mt.client.Remove(key)
log.Debug("IndexService metaTable DeleteIndex", zap.Error(err))
log.Debug("IndexCoord metaTable DeleteIndex", zap.Error(err))
}
func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
@ -298,7 +298,7 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
log.Debug("IndexService metaTable UpdateRecycleState", zap.Any("indexBuildID", indexBuildID),
log.Debug("IndexCoord metaTable UpdateRecycleState", zap.Any("indexBuildID", indexBuildID),
zap.Any("exists", ok))
if !ok {
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
@ -322,7 +322,7 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
err2 := retry.Retry(5, time.Millisecond*200, fn)
if err2 != nil {
meta.indexMeta.Recycled = false
log.Debug("IndexService metaTable UpdateRecycleState failed", zap.Error(err2))
log.Debug("IndexCoord metaTable UpdateRecycleState failed", zap.Error(err2))
return err2
}
}
@ -352,7 +352,7 @@ func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta {
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
log.Debug("IndexService metaTable GetIndexMeta", zap.Any("indexBuildID", indexBuildID),
log.Debug("IndexCoord metaTable GetIndexMeta", zap.Any("indexBuildID", indexBuildID),
zap.Any("exist", ok))
return meta
}
@ -408,10 +408,10 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
log.Debug("IndexService metaTable LoadMetaFromETCD", zap.Any("indexBuildID", indexBuildID),
log.Debug("IndexCoord metaTable LoadMetaFromETCD", zap.Any("indexBuildID", indexBuildID),
zap.Any("revision", revision), zap.Any("ok", ok))
if ok {
log.Debug("IndexService metaTable LoadMetaFromETCD",
log.Debug("IndexCoord metaTable LoadMetaFromETCD",
zap.Any("meta.revision", meta.revision),
zap.Any("revision", revision))
@ -422,12 +422,12 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
log.Debug("IndexService metaTable reloadMeta failed", zap.Error(err))
log.Debug("IndexCoord metaTable reloadMeta failed", zap.Error(err))
return false
}
mt.indexBuildID2Meta[indexBuildID] = *m
log.Debug("IndexService LoadMetaFromETCD success", zap.Any("IndexMeta", m))
log.Debug("IndexCoord LoadMetaFromETCD success", zap.Any("IndexMeta", m))
return true
}

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"context"
@ -25,21 +25,21 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func (i *IndexService) removeNode(nodeID UniqueID) {
func (i *IndexCoord) removeNode(nodeID UniqueID) {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
log.Debug("IndexService", zap.Any("Remove node with ID", nodeID))
log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID))
i.nodeClients.Remove(nodeID)
}
func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
func (i *IndexCoord) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
log.Debug("IndexService addNode", zap.Any("nodeID", nodeID), zap.Any("node address", req.Address))
log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", req.Address))
if i.nodeClients.CheckAddressExist(req.Address) {
log.Debug("IndexService", zap.Any("Node client already exist with ID:", nodeID))
log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
return nil
}
@ -62,7 +62,7 @@ func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest
return nil
}
func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair {
func (i *IndexCoord) prepareNodeInitParams() []*commonpb.KeyValuePair {
var params []*commonpb.KeyValuePair
params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress})
params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID})
@ -72,8 +72,8 @@ func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair {
return params
}
func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
log.Debug("indexservice", zap.Any("register index node, node address = ", req.Address), zap.Any("node ID = ", req.NodeID))
func (i *IndexCoord) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
log.Debug("indexcoord", zap.Any("register index node, node address = ", req.Address), zap.Any("node ID = ", req.NodeID))
ret := &indexpb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"path"
@ -164,7 +164,7 @@ func (pt *ParamTable) initLogCfg() {
panic(err)
}
if len(rootPath) != 0 {
pt.Log.File.Filename = path.Join(rootPath, "indexservice.log")
pt.Log.File.Filename = path.Join(rootPath, "indexcoord.log")
} else {
pt.Log.File.Filename = ""
}

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"container/heap"

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"container/heap"

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"context"
@ -107,13 +107,13 @@ func (it *IndexAddTask) OnEnqueue() error {
}
func (it *IndexAddTask) PreExecute(ctx context.Context) error {
log.Debug("IndexService IndexAddTask PreExecute", zap.Any("IndexBuildID", it.indexBuildID))
log.Debug("IndexCoord IndexAddTask PreExecute", zap.Any("IndexBuildID", it.indexBuildID))
it.req.IndexBuildID = it.indexBuildID
return nil
}
func (it *IndexAddTask) Execute(ctx context.Context) error {
log.Debug("IndexService IndexAddTask Execute", zap.Any("IndexBuildID", it.indexBuildID))
log.Debug("IndexCoord IndexAddTask Execute", zap.Any("IndexBuildID", it.indexBuildID))
err := it.table.AddIndex(it.indexBuildID, it.req)
if err != nil {
return err
@ -122,6 +122,6 @@ func (it *IndexAddTask) Execute(ctx context.Context) error {
}
func (it *IndexAddTask) PostExecute(ctx context.Context) error {
log.Debug("IndexService IndexAddTask PostExecute", zap.Any("IndexBuildID", it.indexBuildID))
log.Debug("IndexCoord IndexAddTask PostExecute", zap.Any("IndexBuildID", it.indexBuildID))
return nil
}

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"container/list"
@ -112,7 +112,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
tID := t.ID()
_, ok := queue.activeTasks[tID]
if ok {
log.Warn("indexservice", zap.Int64("task with ID already in active task list!", tID))
log.Warn("indexcoord", zap.Int64("task with ID already in active task list!", tID))
}
queue.activeTasks[tID] = t
@ -127,13 +127,13 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
delete(queue.activeTasks, tID)
return t
}
log.Debug("indexservice", zap.Int64("sorry, but the ID was not found in the active task list!", tID))
log.Debug("indexcoord", zap.Int64("sorry, but the ID was not found in the active task list!", tID))
return nil
}
func (queue *BaseTaskQueue) Enqueue(t task) error {
tID, _ := queue.sched.idAllocator.AllocOne()
log.Debug("indexservice", zap.Int64("[Builder] allocate reqID", tID))
log.Debug("indexcoord", zap.Int64("[Builder] allocate reqID", tID))
t.SetID(tID)
err := t.OnEnqueue()
if err != nil {

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexservice
package indexcoord
import (
"github.com/milvus-io/milvus/internal/log"
@ -41,7 +41,7 @@ func compare2Array(arr1, arr2 interface{}) bool {
}
return true
}
log.Error("IndexService compare2Array arr2 should be commonpb.KeyValuePair")
log.Error("IndexCoord compare2Array arr2 should be commonpb.KeyValuePair")
return false
}
v1, ok2 := arr1.([]string)
@ -61,9 +61,9 @@ func compare2Array(arr1, arr2 interface{}) bool {
}
return true
}
log.Error("IndexService compare2Array arr2 type should be string array")
log.Error("IndexCoord compare2Array arr2 type should be string array")
return false
}
log.Error("IndexService compare2Array param type should be commonpb.KeyValuePair or string array")
log.Error("IndexCoord compare2Array param type should be commonpb.KeyValuePair or string array")
return false
}

View File

@ -53,7 +53,7 @@ type IndexNode struct {
kv kv.BaseKV
session *sessionutil.Session
serviceClient types.IndexService // method factory
serviceClient types.IndexCoord // method factory
// Add callback functions at different stages
startCallbacks []func()
@ -104,14 +104,14 @@ func (i *IndexNode) Init() error {
return err
}
log.Debug("IndexNode try connect etcd success")
log.Debug("IndexNode start to wait for IndexService ready")
log.Debug("IndexNode start to wait for IndexCoord ready")
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("IndexNode wait for IndexService ready failed", zap.Error(err))
log.Debug("IndexNode wait for IndexCoord ready failed", zap.Error(err))
return err
}
log.Debug("IndexNode report IndexService is ready")
log.Debug("IndexNode report IndexCoord is ready")
request := &indexpb.RegisterNodeRequest{
Base: nil,
Address: &commonpb.Address{
@ -186,7 +186,7 @@ func (i *IndexNode) UpdateStateCode(code internalpb.StateCode) {
i.stateCode.Store(code)
}
func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) {
func (i *IndexNode) SetIndexCoordClient(serviceClient types.IndexCoord) {
i.serviceClient = serviceClient
}

View File

@ -115,7 +115,7 @@ func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.C
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
dataServiceClient: node.dataService,
}
@ -161,7 +161,7 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropCollectionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
chMgr: node.chMgr,
chTicker: node.chTicker,
}
@ -206,7 +206,7 @@ func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasC
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasCollectionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(hct)
@ -341,7 +341,7 @@ func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(dct)
@ -435,7 +435,7 @@ func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.Sh
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowCollectionsRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
queryService: node.queryService,
}
@ -481,7 +481,7 @@ func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.Cr
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
result: nil,
}
@ -526,7 +526,7 @@ func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.Drop
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropPartitionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
result: nil,
}
@ -572,7 +572,7 @@ func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPa
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasPartitionRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
result: nil,
}
@ -763,7 +763,7 @@ func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.Sho
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowPartitionsRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
result: nil,
}
@ -811,7 +811,7 @@ func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.Create
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateIndexRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(cit)
@ -858,7 +858,7 @@ func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.Desc
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeIndexRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(dit)
@ -913,7 +913,7 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropIndexRequest: request,
masterService: node.masterService,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(dit)
if err != nil {
@ -960,8 +960,8 @@ func (node *ProxyNode) GetIndexBuildProgress(ctx context.Context, request *milvu
ctx: ctx,
Condition: NewTaskCondition(ctx),
GetIndexBuildProgressRequest: request,
indexService: node.indexService,
masterService: node.masterService,
indexCoord: node.indexCoord,
rootCoord: node.rootCoord,
dataService: node.dataService,
}
@ -1015,8 +1015,8 @@ func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.GetI
ctx: ctx,
Condition: NewTaskCondition(ctx),
GetIndexStateRequest: request,
indexService: node.indexService,
masterService: node.masterService,
indexCoord: node.indexCoord,
rootCoord: node.rootCoord,
}
err := node.sched.DdQueue.Enqueue(dipt)
@ -1560,7 +1560,7 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Ge
}
func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
describeCollectionResponse, err := node.masterService.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0,
@ -1577,7 +1577,7 @@ func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName strin
return nil, errors.New(describeCollectionResponse.Status.Reason)
}
collectionID := describeCollectionResponse.CollectionID
showPartitionsResp, err := node.masterService.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: 0,
@ -1597,7 +1597,7 @@ func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName strin
ret := make([]UniqueID, 0)
for _, partitionID := range showPartitionsResp.PartitionIDs {
showSegmentResponse, err := node.masterService.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
showSegmentResponse, err := node.rootCoord.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowSegments,
MsgID: 0,

View File

@ -43,7 +43,7 @@ type collectionInfo struct {
}
type MetaCache struct {
client types.MasterService
client types.RootCoord
collInfo map[string]*collectionInfo
mu sync.RWMutex
@ -51,7 +51,7 @@ type MetaCache struct {
var globalMetaCache Cache
func InitMetaCache(client types.MasterService) error {
func InitMetaCache(client types.RootCoord) error {
var err error
globalMetaCache, err = NewMetaCache(client)
if err != nil {
@ -60,7 +60,7 @@ func InitMetaCache(client types.MasterService) error {
return nil
}
func NewMetaCache(client types.MasterService) (*MetaCache, error) {
func NewMetaCache(client types.RootCoord) (*MetaCache, error) {
return &MetaCache{
client: client,
collInfo: map[string]*collectionInfo{},

View File

@ -24,11 +24,11 @@ import (
"github.com/stretchr/testify/assert"
)
type MockMasterClientInterface struct {
types.MasterService
type MockRootCoordClientInterface struct {
types.RootCoord
}
func (m *MockMasterClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
func (m *MockRootCoordClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
if in.CollectionName == "collection1" {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
@ -47,7 +47,7 @@ func (m *MockMasterClientInterface) ShowPartitions(ctx context.Context, in *milv
}, nil
}
func (m *MockMasterClientInterface) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if in.CollectionName == "collection1" {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
@ -70,7 +70,7 @@ func (m *MockMasterClientInterface) DescribeCollection(ctx context.Context, in *
func TestMetaCache_GetCollection(t *testing.T) {
ctx := context.Background()
client := &MockMasterClientInterface{}
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
assert.Nil(t, err)
@ -92,7 +92,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
func TestMetaCache_GetPartitionID(t *testing.T) {
ctx := context.Background()
client := &MockMasterClientInterface{}
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
assert.Nil(t, err)

View File

@ -52,10 +52,10 @@ type ProxyNode struct {
stateCode atomic.Value
masterService types.MasterService
indexService types.IndexService
dataService types.DataService
queryService types.QueryService
rootCoord types.RootCoord
indexCoord types.IndexCoord
dataService types.DataService
queryService types.QueryService
chMgr channelsMgr
@ -122,15 +122,15 @@ func (node *ProxyNode) Init() error {
log.Debug("ProxyNode queryService is ready")
}
// wait for indexservice state changed to Healthy
if node.indexService != nil {
log.Debug("ProxyNode wait for indexService ready")
err := funcutil.WaitForComponentHealthy(node.ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200)
// wait for indexcoord state changed to Healthy
if node.indexCoord != nil {
log.Debug("ProxyNode wait for indexCoord ready")
err := funcutil.WaitForComponentHealthy(node.ctx, node.indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("ProxyNode wait for indexService ready failed", zap.Error(err))
log.Debug("ProxyNode wait for indexCoord ready failed", zap.Error(err))
return err
}
log.Debug("ProxyNode indexService is ready")
log.Debug("ProxyNode indexCoord is ready")
}
if node.queryService != nil {
@ -174,7 +174,7 @@ func (node *ProxyNode) Init() error {
node.idAllocator = idAllocator
node.idAllocator.PeerID = Params.ProxyID
tsoAllocator, err := NewTimestampAllocator(node.ctx, node.masterService, Params.ProxyID)
tsoAllocator, err := NewTimestampAllocator(node.ctx, node.rootCoord, Params.ProxyID)
if err != nil {
return err
}
@ -200,7 +200,7 @@ func (node *ProxyNode) Init() error {
CollectionID: collectionID,
TimeStamp: 0, // todo
}
resp, err := node.masterService.DescribeCollection(node.ctx, req)
resp, err := node.rootCoord.DescribeCollection(node.ctx, req)
if err != nil {
log.Warn("DescribeCollection", zap.Error(err))
return nil, err
@ -319,7 +319,7 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
DefaultTimestamp: maxTs,
}
status, err := node.masterService.UpdateChannelTimeTick(node.ctx, req)
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
if err != nil {
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))
continue
@ -336,7 +336,7 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
}
func (node *ProxyNode) Start() error {
err := InitMetaCache(node.masterService)
err := InitMetaCache(node.rootCoord)
if err != nil {
return err
}
@ -408,12 +408,12 @@ func (node *ProxyNode) AddCloseCallback(callbacks ...func()) {
node.closeCallbacks = append(node.closeCallbacks, callbacks...)
}
func (node *ProxyNode) SetMasterClient(cli types.MasterService) {
node.masterService = cli
func (node *ProxyNode) SetRootCoordClient(cli types.RootCoord) {
node.rootCoord = cli
}
func (node *ProxyNode) SetIndexServiceClient(cli types.IndexService) {
node.indexService = cli
func (node *ProxyNode) SetIndexCoordClient(cli types.IndexCoord) {
node.indexCoord = cli
}
func (node *ProxyNode) SetDataServiceClient(cli types.DataService) {

View File

@ -917,7 +917,7 @@ type CreateCollectionTask struct {
Condition
*milvuspb.CreateCollectionRequest
ctx context.Context
masterService types.MasterService
rootCoord types.RootCoord
dataServiceClient types.DataService
result *commonpb.Status
schema *schemapb.CollectionSchema
@ -1032,7 +1032,7 @@ func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error {
func (cct *CreateCollectionTask) Execute(ctx context.Context) error {
var err error
cct.result, err = cct.masterService.CreateCollection(ctx, cct.CreateCollectionRequest)
cct.result, err = cct.rootCoord.CreateCollection(ctx, cct.CreateCollectionRequest)
return err
}
@ -1043,11 +1043,11 @@ func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error {
type DropCollectionTask struct {
Condition
*milvuspb.DropCollectionRequest
ctx context.Context
masterService types.MasterService
result *commonpb.Status
chMgr channelsMgr
chTicker channelsTimeTicker
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
chMgr channelsMgr
chTicker channelsTimeTicker
}
func (dct *DropCollectionTask) TraceCtx() context.Context {
@ -1103,7 +1103,7 @@ func (dct *DropCollectionTask) Execute(ctx context.Context) error {
return err
}
dct.result, err = dct.masterService.DropCollection(ctx, dct.DropCollectionRequest)
dct.result, err = dct.rootCoord.DropCollection(ctx, dct.DropCollectionRequest)
if err != nil {
return err
}
@ -2066,9 +2066,9 @@ func (rt *RetrieveTask) PostExecute(ctx context.Context) error {
type HasCollectionTask struct {
Condition
*milvuspb.HasCollectionRequest
ctx context.Context
masterService types.MasterService
result *milvuspb.BoolResponse
ctx context.Context
rootCoord types.RootCoord
result *milvuspb.BoolResponse
}
func (hct *HasCollectionTask) TraceCtx() context.Context {
@ -2120,7 +2120,7 @@ func (hct *HasCollectionTask) PreExecute(ctx context.Context) error {
func (hct *HasCollectionTask) Execute(ctx context.Context) error {
var err error
hct.result, err = hct.masterService.HasCollection(ctx, hct.HasCollectionRequest)
hct.result, err = hct.rootCoord.HasCollection(ctx, hct.HasCollectionRequest)
if hct.result == nil {
return errors.New("has collection resp is nil")
}
@ -2137,9 +2137,9 @@ func (hct *HasCollectionTask) PostExecute(ctx context.Context) error {
type DescribeCollectionTask struct {
Condition
*milvuspb.DescribeCollectionRequest
ctx context.Context
masterService types.MasterService
result *milvuspb.DescribeCollectionResponse
ctx context.Context
rootCoord types.RootCoord
result *milvuspb.DescribeCollectionResponse
}
func (dct *DescribeCollectionTask) TraceCtx() context.Context {
@ -2206,7 +2206,7 @@ func (dct *DescribeCollectionTask) Execute(ctx context.Context) error {
PhysicalChannelNames: nil,
}
result, err := dct.masterService.DescribeCollection(ctx, dct.DescribeCollectionRequest)
result, err := dct.rootCoord.DescribeCollection(ctx, dct.DescribeCollectionRequest)
if err != nil {
return err
@ -2426,10 +2426,10 @@ func (g *GetPartitionStatisticsTask) PostExecute(ctx context.Context) error {
type ShowCollectionsTask struct {
Condition
*milvuspb.ShowCollectionsRequest
ctx context.Context
masterService types.MasterService
queryService types.QueryService
result *milvuspb.ShowCollectionsResponse
ctx context.Context
rootCoord types.RootCoord
queryService types.QueryService
result *milvuspb.ShowCollectionsResponse
}
func (sct *ShowCollectionsTask) TraceCtx() context.Context {
@ -2479,18 +2479,18 @@ func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error {
func (sct *ShowCollectionsTask) Execute(ctx context.Context) error {
var err error
respFromMaster, err := sct.masterService.ShowCollections(ctx, sct.ShowCollectionsRequest)
respFromRootCoord, err := sct.rootCoord.ShowCollections(ctx, sct.ShowCollectionsRequest)
if err != nil {
return err
}
if respFromMaster == nil {
if respFromRootCoord == nil {
return errors.New("failed to show collections")
}
if respFromMaster.Status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(respFromMaster.Status.Reason)
if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(respFromRootCoord.Status.Reason)
}
if sct.ShowCollectionsRequest.Type == milvuspb.ShowCollectionsType_InMemory {
@ -2523,8 +2523,8 @@ func (sct *ShowCollectionsTask) Execute(ctx context.Context) error {
}
idMap := make(map[int64]string)
for i, name := range respFromMaster.CollectionNames {
idMap[respFromMaster.CollectionIds[i]] = name
for i, name := range respFromRootCoord.CollectionNames {
idMap[respFromRootCoord.CollectionIds[i]] = name
}
for _, id := range resp.CollectionIDs {
@ -2533,7 +2533,7 @@ func (sct *ShowCollectionsTask) Execute(ctx context.Context) error {
}
}
sct.result = respFromMaster
sct.result = respFromRootCoord
return nil
}
@ -2545,9 +2545,9 @@ func (sct *ShowCollectionsTask) PostExecute(ctx context.Context) error {
type CreatePartitionTask struct {
Condition
*milvuspb.CreatePartitionRequest
ctx context.Context
masterService types.MasterService
result *commonpb.Status
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
}
func (cpt *CreatePartitionTask) TraceCtx() context.Context {
@ -2605,7 +2605,7 @@ func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error {
}
func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) {
cpt.result, err = cpt.masterService.CreatePartition(ctx, cpt.CreatePartitionRequest)
cpt.result, err = cpt.rootCoord.CreatePartition(ctx, cpt.CreatePartitionRequest)
if cpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -2622,9 +2622,9 @@ func (cpt *CreatePartitionTask) PostExecute(ctx context.Context) error {
type DropPartitionTask struct {
Condition
*milvuspb.DropPartitionRequest
ctx context.Context
masterService types.MasterService
result *commonpb.Status
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
}
func (dpt *DropPartitionTask) TraceCtx() context.Context {
@ -2682,7 +2682,7 @@ func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error {
}
func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) {
dpt.result, err = dpt.masterService.DropPartition(ctx, dpt.DropPartitionRequest)
dpt.result, err = dpt.rootCoord.DropPartition(ctx, dpt.DropPartitionRequest)
if dpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -2699,9 +2699,9 @@ func (dpt *DropPartitionTask) PostExecute(ctx context.Context) error {
type HasPartitionTask struct {
Condition
*milvuspb.HasPartitionRequest
ctx context.Context
masterService types.MasterService
result *milvuspb.BoolResponse
ctx context.Context
rootCoord types.RootCoord
result *milvuspb.BoolResponse
}
func (hpt *HasPartitionTask) TraceCtx() context.Context {
@ -2758,7 +2758,7 @@ func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error {
}
func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) {
hpt.result, err = hpt.masterService.HasPartition(ctx, hpt.HasPartitionRequest)
hpt.result, err = hpt.rootCoord.HasPartition(ctx, hpt.HasPartitionRequest)
if hpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -2775,9 +2775,9 @@ func (hpt *HasPartitionTask) PostExecute(ctx context.Context) error {
type ShowPartitionsTask struct {
Condition
*milvuspb.ShowPartitionsRequest
ctx context.Context
masterService types.MasterService
result *milvuspb.ShowPartitionsResponse
ctx context.Context
rootCoord types.RootCoord
result *milvuspb.ShowPartitionsResponse
}
func (spt *ShowPartitionsTask) TraceCtx() context.Context {
@ -2829,7 +2829,7 @@ func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error {
func (spt *ShowPartitionsTask) Execute(ctx context.Context) error {
var err error
spt.result, err = spt.masterService.ShowPartitions(ctx, spt.ShowPartitionsRequest)
spt.result, err = spt.rootCoord.ShowPartitions(ctx, spt.ShowPartitionsRequest)
if spt.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -2846,9 +2846,9 @@ func (spt *ShowPartitionsTask) PostExecute(ctx context.Context) error {
type CreateIndexTask struct {
Condition
*milvuspb.CreateIndexRequest
ctx context.Context
masterService types.MasterService
result *commonpb.Status
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
}
func (cit *CreateIndexTask) TraceCtx() context.Context {
@ -2907,7 +2907,7 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
var err error
cit.result, err = cit.masterService.CreateIndex(ctx, cit.CreateIndexRequest)
cit.result, err = cit.rootCoord.CreateIndex(ctx, cit.CreateIndexRequest)
if cit.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -2924,9 +2924,9 @@ func (cit *CreateIndexTask) PostExecute(ctx context.Context) error {
type DescribeIndexTask struct {
Condition
*milvuspb.DescribeIndexRequest
ctx context.Context
masterService types.MasterService
result *milvuspb.DescribeIndexResponse
ctx context.Context
rootCoord types.RootCoord
result *milvuspb.DescribeIndexResponse
}
func (dit *DescribeIndexTask) TraceCtx() context.Context {
@ -2984,7 +2984,7 @@ func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.masterService.DescribeIndex(ctx, dit.DescribeIndexRequest)
dit.result, err = dit.rootCoord.DescribeIndex(ctx, dit.DescribeIndexRequest)
if dit.result == nil {
return errors.New("get collection statistics resp is nil")
}
@ -3002,8 +3002,8 @@ type DropIndexTask struct {
Condition
ctx context.Context
*milvuspb.DropIndexRequest
masterService types.MasterService
result *commonpb.Status
rootCoord types.RootCoord
result *commonpb.Status
}
func (dit *DropIndexTask) TraceCtx() context.Context {
@ -3062,7 +3062,7 @@ func (dit *DropIndexTask) PreExecute(ctx context.Context) error {
func (dit *DropIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.masterService.DropIndex(ctx, dit.DropIndexRequest)
dit.result, err = dit.rootCoord.DropIndex(ctx, dit.DropIndexRequest)
if dit.result == nil {
return errors.New("drop index resp is nil")
}
@ -3079,11 +3079,11 @@ func (dit *DropIndexTask) PostExecute(ctx context.Context) error {
type GetIndexBuildProgressTask struct {
Condition
*milvuspb.GetIndexBuildProgressRequest
ctx context.Context
indexService types.IndexService
masterService types.MasterService
dataService types.DataService
result *milvuspb.GetIndexBuildProgressResponse
ctx context.Context
indexCoord types.IndexCoord
rootCoord types.RootCoord
dataService types.DataService
result *milvuspb.GetIndexBuildProgressResponse
}
func (gibpt *GetIndexBuildProgressTask) TraceCtx() context.Context {
@ -3152,7 +3152,7 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
CollectionName: collectionName,
CollectionID: collectionID,
}
partitions, err := gibpt.masterService.ShowPartitions(ctx, showPartitionRequest)
partitions, err := gibpt.rootCoord.ShowPartitions(ctx, showPartitionRequest)
if err != nil {
return err
}
@ -3173,7 +3173,7 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
// IndexName: gibpt.IndexName,
}
indexDescriptionResp, err2 := gibpt.masterService.DescribeIndex(ctx, &describeIndexReq)
indexDescriptionResp, err2 := gibpt.rootCoord.DescribeIndex(ctx, &describeIndexReq)
if err2 != nil {
return err2
}
@ -3203,7 +3203,7 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
PartitionID: partitionID,
}
segments, err := gibpt.masterService.ShowSegments(ctx, showSegmentsRequest)
segments, err := gibpt.rootCoord.ShowSegments(ctx, showSegmentsRequest)
if err != nil {
return err
}
@ -3229,7 +3229,7 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
SegmentID: segmentID,
}
segmentDesc, err := gibpt.masterService.DescribeSegment(ctx, describeSegmentRequest)
segmentDesc, err := gibpt.rootCoord.DescribeSegment(ctx, describeSegmentRequest)
if err != nil {
return err
}
@ -3241,7 +3241,7 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
}
}
states, err := gibpt.indexService.GetIndexStates(ctx, getIndexStatesRequest)
states, err := gibpt.indexCoord.GetIndexStates(ctx, getIndexStatesRequest)
if err != nil {
return err
}
@ -3301,10 +3301,10 @@ func (gibpt *GetIndexBuildProgressTask) PostExecute(ctx context.Context) error {
type GetIndexStateTask struct {
Condition
*milvuspb.GetIndexStateRequest
ctx context.Context
indexService types.IndexService
masterService types.MasterService
result *milvuspb.GetIndexStateResponse
ctx context.Context
indexCoord types.IndexCoord
rootCoord types.RootCoord
result *milvuspb.GetIndexStateResponse
}
func (gist *GetIndexStateTask) TraceCtx() context.Context {
@ -3373,7 +3373,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionName: collectionName,
CollectionID: collectionID,
}
partitions, err := gist.masterService.ShowPartitions(ctx, showPartitionRequest)
partitions, err := gist.rootCoord.ShowPartitions(ctx, showPartitionRequest)
if err != nil {
return err
}
@ -3394,7 +3394,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
IndexName: gist.IndexName,
}
indexDescriptionResp, err2 := gist.masterService.DescribeIndex(ctx, &describeIndexReq)
indexDescriptionResp, err2 := gist.rootCoord.DescribeIndex(ctx, &describeIndexReq)
if err2 != nil {
return err2
}
@ -3424,7 +3424,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
PartitionID: partitionID,
}
segments, err := gist.masterService.ShowSegments(ctx, showSegmentsRequest)
segments, err := gist.rootCoord.ShowSegments(ctx, showSegmentsRequest)
if err != nil {
return err
}
@ -3451,7 +3451,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
SegmentID: segmentID,
}
segmentDesc, err := gist.masterService.DescribeSegment(ctx, describeSegmentRequest)
segmentDesc, err := gist.rootCoord.DescribeSegment(ctx, describeSegmentRequest)
if err != nil {
return err
}
@ -3483,7 +3483,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, indexBuildIDs[idx])
}
}
states, err := gist.indexService.GetIndexStates(ctx, getIndexStatesRequest)
states, err := gist.indexCoord.GetIndexStates(ctx, getIndexStatesRequest)
if err != nil {
return err
}

View File

@ -22,16 +22,16 @@ import (
)
type TimestampAllocator struct {
ctx context.Context
masterService types.MasterService
peerID UniqueID
ctx context.Context
rootCoord types.RootCoord
peerID UniqueID
}
func NewTimestampAllocator(ctx context.Context, master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) {
func NewTimestampAllocator(ctx context.Context, rc types.RootCoord, peerID UniqueID) (*TimestampAllocator, error) {
a := &TimestampAllocator{
ctx: ctx,
peerID: peerID,
masterService: master,
ctx: ctx,
peerID: peerID,
rootCoord: rc,
}
return a, nil
}
@ -48,7 +48,7 @@ func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
Count: count,
}
resp, err := ta.masterService.AllocTimestamp(ctx, req)
resp, err := ta.rootCoord.AllocTimestamp(ctx, req)
defer cancel()
if err != nil {

View File

@ -34,13 +34,13 @@ type historical struct {
}
func newHistorical(ctx context.Context,
masterService types.MasterService,
rootCoord types.RootCoord,
dataService types.DataService,
indexService types.IndexService,
indexCoord types.IndexCoord,
factory msgstream.Factory,
etcdKV *etcdkv.EtcdKV) *historical {
replica := newCollectionReplica(etcdKV)
loader := newSegmentLoader(ctx, masterService, indexService, dataService, replica, etcdKV)
loader := newSegmentLoader(ctx, rootCoord, indexCoord, dataService, replica, etcdKV)
ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory)
return &historical{

View File

@ -44,8 +44,8 @@ type indexLoader struct {
fieldIndexes map[string][]*internalpb.IndexStats
fieldStatsChan chan []*internalpb.FieldStats
masterService types.MasterService
indexService types.IndexService
rootCoord types.RootCoord
indexCoord types.IndexCoord
kv kv.BaseKV // minio kv
}
@ -293,7 +293,7 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
CollectionID: collectionID,
SegmentID: segment.segmentID,
}
response, err := loader.masterService.DescribeSegment(ctx, req)
response, err := loader.rootCoord.DescribeSegment(ctx, req)
if err != nil {
return err
}
@ -305,14 +305,14 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
return errors.New("there are no indexes on this segment")
}
if loader.indexService == nil {
if loader.indexCoord == nil {
return errors.New("null index service client")
}
indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{response.BuildID},
}
pathResponse, err := loader.indexService.GetIndexFilePaths(ctx, indexFilePathRequest)
pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
return err
}
@ -338,14 +338,14 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
ctx := context.TODO()
if loader.indexService == nil {
if loader.indexCoord == nil {
return nil, errors.New("null index service client")
}
indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := loader.indexService.GetIndexFilePaths(ctx, indexFilePathRequest)
pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, err
}
@ -357,7 +357,7 @@ func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error
return pathResponse.FilePaths[0].IndexFilePaths, nil
}
func newIndexLoader(ctx context.Context, masterService types.MasterService, indexService types.IndexService, replica ReplicaInterface) *indexLoader {
func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
@ -378,8 +378,8 @@ func newIndexLoader(ctx context.Context, masterService types.MasterService, inde
fieldIndexes: make(map[string][]*internalpb.IndexStats),
fieldStatsChan: make(chan []*internalpb.FieldStats, 1024),
masterService: masterService,
indexService: indexService,
rootCoord: rootCoord,
indexCoord: indexCoord,
kv: client,
}

View File

@ -61,10 +61,10 @@ type QueryNode struct {
retrieveService *retrieveService
// clients
masterService types.MasterService
queryService types.QueryService
indexService types.IndexService
dataService types.DataService
rootCoord types.RootCoord
queryService types.QueryService
indexCoord types.IndexCoord
dataService types.DataService
msFactory msgstream.Factory
scheduler *taskScheduler
@ -141,9 +141,9 @@ func (node *QueryNode) Init() error {
log.Debug("queryNode try to connect etcd success")
node.historical = newHistorical(node.queryNodeLoopCtx,
node.masterService,
node.rootCoord,
node.dataService,
node.indexService,
node.indexCoord,
node.msFactory,
node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
@ -187,12 +187,12 @@ func (node *QueryNode) Init() error {
//
//log.Debug("QueryNode Init ", zap.Int64("QueryNodeID", Params.QueryNodeID), zap.Any("searchChannelNames", Params.SearchChannelNames))
if node.masterService == nil {
log.Error("null master service detected")
if node.rootCoord == nil {
log.Error("null rootCoord detected")
}
if node.indexService == nil {
log.Error("null index service detected")
if node.indexCoord == nil {
log.Error("null indexCoord detected")
}
if node.dataService == nil {
@ -260,11 +260,11 @@ func (node *QueryNode) UpdateStateCode(code internalpb.StateCode) {
node.stateCode.Store(code)
}
func (node *QueryNode) SetMasterService(master types.MasterService) error {
if master == nil {
return errors.New("null master service interface")
func (node *QueryNode) SetRootCoord(rc types.RootCoord) error {
if rc == nil {
return errors.New("null root coord interface")
}
node.masterService = master
node.rootCoord = rc
return nil
}
@ -276,11 +276,11 @@ func (node *QueryNode) SetQueryService(query types.QueryService) error {
return nil
}
func (node *QueryNode) SetIndexService(index types.IndexService) error {
func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error {
if index == nil {
return errors.New("null index service interface")
return errors.New("null indexCoord interface")
}
node.indexService = index
node.indexCoord = index
return nil
}

View File

@ -302,7 +302,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths
return nil
}
func newSegmentLoader(ctx context.Context, masterService types.MasterService, indexService types.IndexService, dataService types.DataService, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, dataService types.DataService, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
@ -317,7 +317,7 @@ func newSegmentLoader(ctx context.Context, masterService types.MasterService, in
panic(err)
}
iLoader := newIndexLoader(ctx, masterService, indexService, replica)
iLoader := newIndexLoader(ctx, rootCoord, indexCoord, replica)
return &segmentLoader{
historicalReplica: replica,

View File

@ -133,7 +133,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
},
CollectionID: collectionID,
}
desColRsp, err := w.node.masterService.DescribeCollection(ctx, desColReq)
desColRsp, err := w.node.rootCoord.DescribeCollection(ctx, desColReq)
if err != nil {
log.Error("get channels failed, err = " + err.Error())
return err

View File

@ -153,7 +153,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
LoadCollectionRequest: req,
masterService: qs.masterServiceClient,
rootCoord: qs.rootCoordClient,
dataService: qs.dataServiceClient,
cluster: qs.cluster,
meta: qs.meta,

View File

@ -28,14 +28,14 @@ const (
numSegment = 12
)
type MasterMock struct {
types.MasterService
type RootCoordMock struct {
types.RootCoord
CollectionIDs []UniqueID
Col2partition map[UniqueID][]UniqueID
Partition2segment map[UniqueID][]UniqueID
}
func NewMasterMock() *MasterMock {
func NewRootCoordMock() *RootCoordMock {
collectionIDs := make([]UniqueID, 0)
collectionIDs = append(collectionIDs, 1)
@ -51,19 +51,19 @@ func NewMasterMock() *MasterMock {
}
partition2segment[1] = segmentIDs
return &MasterMock{
return &RootCoordMock{
CollectionIDs: collectionIDs,
Col2partition: col2partition,
Partition2segment: partition2segment,
}
}
func (master *MasterMock) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
func (rc *RootCoordMock) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
collectionID := in.CollectionID
partitionIDs := make([]UniqueID, 0)
for _, id := range master.CollectionIDs {
for _, id := range rc.CollectionIDs {
if id == collectionID {
partitions := master.Col2partition[collectionID]
partitions := rc.Col2partition[collectionID]
partitionIDs = append(partitionIDs, partitions...)
}
}
@ -77,20 +77,20 @@ func (master *MasterMock) ShowPartitions(ctx context.Context, in *milvuspb.ShowP
return response, nil
}
func (master *MasterMock) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
func (rc *RootCoordMock) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
collectionID := in.CollectionID
partitionID := in.PartitionID
for _, id := range master.CollectionIDs {
for _, id := range rc.CollectionIDs {
if id == collectionID {
partitions := master.Col2partition[collectionID]
partitions := rc.Col2partition[collectionID]
for _, partition := range partitions {
if partition == partitionID {
return &milvuspb.ShowSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
//SegmentIDs: master.Partition2segment[partition],
//SegmentIDs: rc.Partition2segment[partition],
}, nil
}
}

View File

@ -55,8 +55,8 @@ type QueryService struct {
cluster *queryNodeCluster
scheduler *TaskScheduler
dataServiceClient types.DataService
masterServiceClient types.MasterService
dataServiceClient types.DataService
rootCoordClient types.RootCoord
session *sessionutil.Session
eventChan <-chan *sessionutil.SessionEvent
@ -94,7 +94,7 @@ func (qs *QueryService) Init() error {
return err
}
qs.scheduler, err = NewTaskScheduler(qs.loopCtx, metaKV, qs.cluster, etcdKV, qs.masterServiceClient, qs.dataServiceClient)
qs.scheduler, err = NewTaskScheduler(qs.loopCtx, metaKV, qs.cluster, etcdKV, qs.rootCoordClient, qs.dataServiceClient)
return err
}
log.Debug("queryService try to connect etcd")
@ -161,8 +161,8 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
return service, nil
}
func (qs *QueryService) SetMasterService(masterService types.MasterService) {
qs.masterServiceClient = masterService
func (qs *QueryService) SetRootCoord(rootCoord types.RootCoord) {
qs.rootCoordClient = rootCoord
}
func (qs *QueryService) SetDataService(dataService types.DataService) {
@ -209,7 +209,7 @@ func (qs *QueryService) watchNodeLoop() {
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
master: qs.masterServiceClient,
rootCoord: qs.rootCoordClient,
dataService: qs.dataServiceClient,
cluster: qs.cluster,
meta: qs.meta,
@ -252,7 +252,7 @@ func (qs *QueryService) watchNodeLoop() {
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
master: qs.masterServiceClient,
rootCoord: qs.rootCoordClient,
dataService: qs.dataServiceClient,
cluster: qs.cluster,
meta: qs.meta,

View File

@ -60,7 +60,7 @@ func TestQueryService_load(t *testing.T) {
assert.Nil(t, err)
service.Init()
service.Start()
service.SetMasterService(NewMasterMock())
service.SetRootCoord(NewRootCoordMock())
service.SetDataService(NewDataMock())
registerNodeRequest := &querypb.RegisterNodeRequest{
Address: &commonpb.Address{},

View File

@ -126,10 +126,10 @@ func (bt *BaseTask) SetState(state taskState) {
type LoadCollectionTask struct {
BaseTask
*querypb.LoadCollectionRequest
masterService types.MasterService
dataService types.DataService
cluster *queryNodeCluster
meta *meta
rootCoord types.RootCoord
dataService types.DataService
cluster *queryNodeCluster
meta *meta
}
func (lct *LoadCollectionTask) Marshal() string {
@ -167,7 +167,7 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
},
CollectionID: collectionID,
}
showPartitionResponse, err := lct.masterService.ShowPartitions(ctx, showPartitionRequest)
showPartitionResponse, err := lct.rootCoord.ShowPartitions(ctx, showPartitionRequest)
if err != nil {
status.Reason = err.Error()
lct.result = status
@ -1047,7 +1047,7 @@ type HandoffTask struct {
type LoadBalanceTask struct {
BaseTask
*querypb.LoadBalanceRequest
master types.MasterService
rootCoord types.RootCoord
dataService types.DataService
cluster *queryNodeCluster
meta *meta

View File

@ -126,7 +126,7 @@ type TaskScheduler struct {
taskIDAllocator func() (UniqueID, error)
client *etcdkv.EtcdKV
master types.MasterService
rootCoord types.RootCoord
dataService types.DataService
wg sync.WaitGroup
@ -134,7 +134,7 @@ type TaskScheduler struct {
cancel context.CancelFunc
}
func NewTaskScheduler(ctx context.Context, meta *meta, cluster *queryNodeCluster, kv *etcdkv.EtcdKV, master types.MasterService, dataService types.DataService) (*TaskScheduler, error) {
func NewTaskScheduler(ctx context.Context, meta *meta, cluster *queryNodeCluster, kv *etcdkv.EtcdKV, rootCoord types.RootCoord, dataService types.DataService) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
taskChan := make(chan task, 1024)
s := &TaskScheduler{
@ -144,7 +144,7 @@ func NewTaskScheduler(ctx context.Context, meta *meta, cluster *queryNodeCluster
cluster: cluster,
activateTaskChan: taskChan,
client: kv,
master: master,
rootCoord: rootCoord,
dataService: dataService,
}
s.triggerTaskQueue = NewTaskQueue()
@ -258,7 +258,7 @@ func (scheduler *TaskScheduler) unmarshalTask(t string) (task, error) {
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
LoadCollectionRequest: &loadReq,
masterService: scheduler.master,
rootCoord: scheduler.rootCoord,
dataService: scheduler.dataService,
cluster: scheduler.cluster,
meta: scheduler.meta,
@ -393,7 +393,7 @@ func (scheduler *TaskScheduler) unmarshalTask(t string) (task, error) {
triggerCondition: loadReq.BalanceReason,
},
LoadBalanceRequest: &loadReq,
master: scheduler.master,
rootCoord: scheduler.rootCoord,
dataService: scheduler.dataService,
cluster: scheduler.cluster,
meta: scheduler.meta,
@ -419,7 +419,7 @@ func (scheduler *TaskScheduler) Enqueue(tasks []task) {
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, t.ID())
kvs[taskKey] = t.Marshal()
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
kvs[stateKey] = string(taskUndo)
kvs[stateKey] = strconv.Itoa(int(taskUndo))
err = scheduler.client.MultiSave(kvs)
if err != nil {
log.Error("error when save trigger task to etcd", zap.Int64("taskID", t.ID()))
@ -439,7 +439,7 @@ func (scheduler *TaskScheduler) processTask(t task) error {
defer span.Finish()
span.LogFields(oplog.Int64("processTask: scheduler process PreExecute", t.ID()))
key := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
err := scheduler.client.Save(key, string(taskDoing))
err := scheduler.client.Save(key, strconv.Itoa(int(taskDoing)))
if err != nil {
log.Debug("processTask: update task state err", zap.String("reason", err.Error()))
@ -484,7 +484,7 @@ func (scheduler *TaskScheduler) processTask(t task) error {
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, childTask.ID())
kvs[taskKey] = t.Marshal()
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, childTask.ID())
kvs[stateKey] = string(taskUndo)
kvs[stateKey] = strconv.Itoa(int(taskUndo))
err = scheduler.client.MultiSave(kvs)
if err != nil {
return err
@ -492,7 +492,7 @@ func (scheduler *TaskScheduler) processTask(t task) error {
log.Debug("processTask: save active task to etcd", zap.Int64("parent taskID", t.ID()), zap.Int64("child taskID", childTask.ID()))
}
err = scheduler.client.Save(key, string(taskDone))
err = scheduler.client.Save(key, strconv.Itoa(int(taskDone)))
if err != nil {
log.Debug("processTask: update task state err", zap.String("reason", err.Error()))
trace.LogError(span, err)
@ -577,7 +577,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task)
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, rt.ID())
saves[taskKey] = rt.Marshal()
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, rt.ID())
saves[stateKey] = string(taskUndo)
saves[stateKey] = strconv.Itoa(int(taskUndo))
reSchedID = append(reSchedID, rt.ID())
}
}
@ -655,7 +655,7 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() {
continue
}
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
err := scheduler.client.Save(stateKey, string(taskDoing))
err := scheduler.client.Save(stateKey, strconv.Itoa(int(taskDoing)))
if err != nil {
t.Notify(err)
continue

View File

@ -13,5 +13,5 @@ approvers:
- scsven
labels:
- component/master
- component/rootcoord

View File

@ -778,7 +778,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataService) error {
return nil
}
func (c *Core) SetIndexCoord(s types.IndexService) error {
func (c *Core) SetIndexCoord(s types.IndexCoord) error {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {

View File

@ -132,7 +132,7 @@ func (q *queryMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseC
}
type indexMock struct {
types.IndexService
types.IndexCoord
fileArray []string
idxBuildID []int64
idxID []int64

View File

@ -60,7 +60,7 @@ func (*tbq) ReleaseCollection(context.Context, *querypb.ReleaseCollectionRequest
}
type tbi struct {
types.IndexService
types.IndexCoord
}
func (*tbi) BuildIndex(context.Context, *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {

View File

@ -72,7 +72,7 @@ type IndexNode interface {
CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error)
}
type IndexService interface {
type IndexCoord interface {
Component
TimeTickProvider
@ -83,7 +83,7 @@ type IndexService interface {
GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error)
}
type MasterService interface {
type RootCoord interface {
Component
TimeTickProvider
@ -114,13 +114,13 @@ type MasterService interface {
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
}
// MasterComponent is used by grpc server of master service
type MasterComponent interface {
MasterService
// RootCoordComponent is used by grpc server of master service
type RootCoordComponent interface {
RootCoord
UpdateStateCode(internalpb.StateCode)
SetDataCoord(context.Context, DataService) error
SetIndexCoord(IndexService) error
SetIndexCoord(IndexCoord) error
SetQueryCoord(QueryService) error
SetNewProxyClient(func(sess *sessionutil.Session) (ProxyNode, error))
}

View File

@ -20,7 +20,7 @@ const (
ProxyNodeRole = "ProxyNode"
QueryServiceRole = "QueryService"
QueryNodeRole = "QueryNode"
IndexServiceRole = "IndexService"
IndexCoordRole = "IndexCoord"
IndexNodeRole = "IndexNode"
DataCoordRole = "DataCoord"
DataNodeRole = "DataNode"

View File

@ -38,4 +38,4 @@ go test -race -cover "${MILVUS_DIR}/querynode/..." -failfast
go test -race -cover -v "${MILVUS_DIR}/distributed/rootcoord" -failfast
go test -race -cover -v "${MILVUS_DIR}/rootcoord" -failfast
go test -race -cover -v "${MILVUS_DIR}/dataservice/..." -failfast
go test -race -cover -v "${MILVUS_DIR}/indexservice/..." -failfast
go test -race -cover -v "${MILVUS_DIR}/indexcoord/..." -failfast