2021-10-27 18:02:56 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 13:50:12 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-10-27 18:02:56 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 13:50:12 +08:00
|
|
|
//
|
2021-10-27 18:02:56 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 13:50:12 +08:00
|
|
|
|
2021-06-21 17:28:03 +08:00
|
|
|
package indexcoord
|
2021-01-15 14:38:36 +08:00
|
|
|
|
|
|
|
import (
|
2021-01-19 18:32:57 +08:00
|
|
|
"context"
|
2021-03-08 10:09:48 +08:00
|
|
|
"errors"
|
2022-08-25 15:48:54 +08:00
|
|
|
"fmt"
|
2021-03-08 15:25:55 +08:00
|
|
|
"math/rand"
|
2022-03-17 17:17:22 +08:00
|
|
|
"os"
|
2022-09-09 15:52:35 +08:00
|
|
|
"strconv"
|
2021-01-19 18:32:57 +08:00
|
|
|
"sync"
|
2021-06-04 16:29:35 +08:00
|
|
|
"sync/atomic"
|
2021-11-22 16:23:17 +08:00
|
|
|
"syscall"
|
2021-01-19 18:32:57 +08:00
|
|
|
"time"
|
|
|
|
|
2022-05-31 16:36:03 +08:00
|
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
2022-08-25 15:48:54 +08:00
|
|
|
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
2022-04-07 22:05:32 +08:00
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
2022-05-31 16:36:03 +08:00
|
|
|
"go.uber.org/zap"
|
2022-04-07 22:05:32 +08:00
|
|
|
|
2022-09-16 16:56:49 +08:00
|
|
|
"github.com/milvus-io/milvus/api/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/api/milvuspb"
|
2021-12-16 15:30:04 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2022-09-15 15:44:31 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/kv"
|
2021-04-22 14:45:57 +08:00
|
|
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2022-08-25 15:48:54 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
2022-05-31 16:36:03 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/metrics"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
2022-05-31 16:36:03 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/storage"
|
2021-08-19 14:32:11 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/types"
|
2022-08-25 15:48:54 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util"
|
2022-05-31 16:36:03 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/dependency"
|
2021-08-19 14:32:11 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
2021-12-23 18:39:11 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/paramtable"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/retry"
|
2021-05-21 19:28:52 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2021-01-26 09:38:40 +08:00
|
|
|
)
|
|
|
|
|
2021-10-04 17:26:38 +08:00
|
|
|
// make sure IndexCoord implements types.IndexCoord
|
|
|
|
var _ types.IndexCoord = (*IndexCoord)(nil)
|
|
|
|
|
2022-02-08 20:57:47 +08:00
|
|
|
var Params paramtable.ComponentParam
|
2021-12-23 18:39:11 +08:00
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// IndexCoord is a component responsible for scheduling index construction tasks and maintaining index status.
|
|
|
|
// IndexCoord accepts requests from rootcoord to build indexes, delete indexes, and query index information.
|
|
|
|
// IndexCoord is responsible for assigning IndexBuildID to the request to build the index, and forwarding the
|
|
|
|
// request to build the index to IndexNode. IndexCoord records the status of the index, and the index file.
|
2021-06-21 17:28:03 +08:00
|
|
|
type IndexCoord struct {
|
2021-07-14 14:15:55 +08:00
|
|
|
stateCode atomic.Value
|
2021-01-26 19:24:09 +08:00
|
|
|
|
2021-01-19 18:32:57 +08:00
|
|
|
loopCtx context.Context
|
|
|
|
loopCancel func()
|
|
|
|
loopWg sync.WaitGroup
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
sched *TaskScheduler
|
|
|
|
session *sessionutil.Session
|
|
|
|
serverID UniqueID
|
2021-01-19 18:32:57 +08:00
|
|
|
|
2021-05-27 22:24:29 +08:00
|
|
|
eventChan <-chan *sessionutil.SessionEvent
|
|
|
|
|
2022-04-07 22:05:32 +08:00
|
|
|
factory dependency.Factory
|
2022-03-17 18:03:23 +08:00
|
|
|
etcdCli *clientv3.Client
|
2022-09-09 15:52:35 +08:00
|
|
|
etcdKV kv.MetaKv
|
2022-03-17 18:03:23 +08:00
|
|
|
chunkManager storage.ChunkManager
|
2021-01-19 18:32:57 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
metaTable *metaTable
|
|
|
|
nodeManager *NodeManager
|
|
|
|
indexBuilder *indexBuilder
|
|
|
|
garbageCollector *garbageCollector
|
|
|
|
flushedSegmentWatcher *flushedSegmentWatcher
|
2022-09-21 16:34:51 +08:00
|
|
|
handoff *handoff
|
2021-05-27 22:24:29 +08:00
|
|
|
|
2021-09-03 17:15:26 +08:00
|
|
|
metricsCacheManager *metricsinfo.MetricsCacheManager
|
|
|
|
|
2021-01-26 09:38:40 +08:00
|
|
|
nodeLock sync.RWMutex
|
|
|
|
|
2021-09-22 19:31:54 +08:00
|
|
|
initOnce sync.Once
|
|
|
|
startOnce sync.Once
|
2021-09-17 20:17:53 +08:00
|
|
|
|
2021-09-14 10:41:21 +08:00
|
|
|
reqTimeoutInterval time.Duration
|
|
|
|
|
2022-05-31 16:36:03 +08:00
|
|
|
dataCoordClient types.DataCoord
|
2022-08-25 15:48:54 +08:00
|
|
|
rootCoordClient types.RootCoord
|
2022-05-31 16:36:03 +08:00
|
|
|
|
2022-09-29 18:35:02 +08:00
|
|
|
enableActiveStandBy bool
|
|
|
|
activateFunc func()
|
|
|
|
|
2021-01-19 18:32:57 +08:00
|
|
|
// Add callback functions at different stages
|
|
|
|
startCallbacks []func()
|
|
|
|
closeCallbacks []func()
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-30 11:06:04 +08:00
|
|
|
// UniqueID is an alias of int64, is used as a unique identifier for the request.
|
2021-01-19 18:32:57 +08:00
|
|
|
type UniqueID = typeutil.UniqueID
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// NewIndexCoord creates a new IndexCoord component.
|
2022-04-07 22:05:32 +08:00
|
|
|
func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord, error) {
|
2021-03-08 15:25:55 +08:00
|
|
|
rand.Seed(time.Now().UnixNano())
|
2021-01-26 09:38:40 +08:00
|
|
|
ctx1, cancel := context.WithCancel(ctx)
|
2021-06-21 17:28:03 +08:00
|
|
|
i := &IndexCoord{
|
2022-09-29 18:35:02 +08:00
|
|
|
loopCtx: ctx1,
|
|
|
|
loopCancel: cancel,
|
|
|
|
reqTimeoutInterval: time.Second * 10,
|
|
|
|
factory: factory,
|
|
|
|
enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby,
|
2021-01-26 09:38:40 +08:00
|
|
|
}
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Abnormal)
|
2021-01-29 17:08:31 +08:00
|
|
|
return i, nil
|
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// Register register IndexCoord role at etcd.
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) Register() error {
|
2021-12-15 11:47:10 +08:00
|
|
|
i.session.Register()
|
2022-09-29 18:35:02 +08:00
|
|
|
if i.enableActiveStandBy {
|
|
|
|
i.session.ProcessActiveStandBy(i.activateFunc)
|
|
|
|
}
|
2021-12-15 11:47:10 +08:00
|
|
|
go i.session.LivenessCheck(i.loopCtx, func() {
|
|
|
|
log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
|
|
|
|
if err := i.Stop(); err != nil {
|
|
|
|
log.Fatal("failed to stop server", zap.Error(err))
|
|
|
|
}
|
|
|
|
// manually send signal to starter goroutine
|
2021-12-29 14:35:21 +08:00
|
|
|
if i.session.TriggerKill {
|
2022-03-17 17:17:22 +08:00
|
|
|
if p, err := os.FindProcess(os.Getpid()); err == nil {
|
|
|
|
p.Signal(syscall.SIGINT)
|
|
|
|
}
|
2021-12-29 14:35:21 +08:00
|
|
|
}
|
2021-12-15 11:47:10 +08:00
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *IndexCoord) initSession() error {
|
2022-02-07 10:09:45 +08:00
|
|
|
i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath, i.etcdCli)
|
2021-09-09 10:06:29 +08:00
|
|
|
if i.session == nil {
|
|
|
|
return errors.New("failed to initialize session")
|
|
|
|
}
|
2021-12-29 14:35:21 +08:00
|
|
|
i.session.Init(typeutil.IndexCoordRole, Params.IndexCoordCfg.Address, true, true)
|
2022-09-29 18:35:02 +08:00
|
|
|
i.session.SetEnableActiveStandBy(i.enableActiveStandBy)
|
2022-02-07 10:09:45 +08:00
|
|
|
Params.SetLogger(i.session.ServerID)
|
2022-08-25 15:48:54 +08:00
|
|
|
i.serverID = i.session.ServerID
|
2021-05-25 15:06:05 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// Init initializes the IndexCoord component.
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) Init() error {
|
2021-12-14 15:31:07 +08:00
|
|
|
var initErr error
|
2021-09-23 10:53:53 +08:00
|
|
|
Params.InitOnce()
|
2021-09-22 19:31:54 +08:00
|
|
|
i.initOnce.Do(func() {
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Initializing)
|
|
|
|
log.Debug("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
|
2021-12-15 11:47:10 +08:00
|
|
|
|
2022-04-07 22:05:32 +08:00
|
|
|
i.factory.Init(&Params)
|
|
|
|
|
2021-12-15 11:47:10 +08:00
|
|
|
err := i.initSession()
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err.Error())
|
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
2021-09-22 19:31:54 +08:00
|
|
|
|
|
|
|
connectEtcdFn := func() error {
|
2022-08-25 15:48:54 +08:00
|
|
|
i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
|
|
|
|
i.metaTable, err = NewMetaTable(i.etcdKV)
|
2021-01-26 09:38:40 +08:00
|
|
|
return err
|
|
|
|
}
|
2021-09-22 19:31:54 +08:00
|
|
|
log.Debug("IndexCoord try to connect etcd")
|
2022-05-05 09:31:51 +08:00
|
|
|
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100))
|
2021-01-26 09:38:40 +08:00
|
|
|
if err != nil {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
|
2021-09-22 19:31:54 +08:00
|
|
|
initErr = err
|
|
|
|
return
|
2021-01-26 09:38:40 +08:00
|
|
|
}
|
2022-07-07 14:44:21 +08:00
|
|
|
|
2021-09-22 19:31:54 +08:00
|
|
|
log.Debug("IndexCoord try to connect etcd success")
|
2021-12-23 21:35:52 +08:00
|
|
|
i.nodeManager = NewNodeManager(i.loopCtx)
|
2021-07-14 14:15:55 +08:00
|
|
|
|
2021-09-22 19:31:54 +08:00
|
|
|
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
|
2021-09-22 19:31:54 +08:00
|
|
|
if err != nil {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
|
2021-09-22 19:31:54 +08:00
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
2022-07-07 14:44:21 +08:00
|
|
|
aliveNodeID := make([]UniqueID, 0)
|
2022-09-27 22:38:55 +08:00
|
|
|
if Params.IndexCoordCfg.BindIndexNodeMode {
|
|
|
|
if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {
|
|
|
|
log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID),
|
|
|
|
zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err))
|
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Debug("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
|
|
|
|
zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
|
|
|
|
aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
|
|
|
|
metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc()
|
|
|
|
} else {
|
|
|
|
for _, session := range sessions {
|
|
|
|
session := session
|
|
|
|
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
|
|
|
|
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
|
|
|
|
zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
aliveNodeID = append(aliveNodeID, session.ServerID)
|
2022-08-25 15:48:54 +08:00
|
|
|
}
|
2021-09-22 19:31:54 +08:00
|
|
|
}
|
2022-09-06 17:19:11 +08:00
|
|
|
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
|
2022-07-07 14:44:21 +08:00
|
|
|
i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)
|
|
|
|
|
2022-02-15 15:07:48 +08:00
|
|
|
// TODO silverxia add Rewatch logic
|
2021-12-08 10:11:04 +08:00
|
|
|
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
|
2021-05-14 10:05:18 +08:00
|
|
|
|
2022-09-23 14:40:51 +08:00
|
|
|
chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx)
|
2021-09-22 19:31:54 +08:00
|
|
|
if err != nil {
|
2022-03-17 18:03:23 +08:00
|
|
|
log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
|
2021-09-22 19:31:54 +08:00
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
2022-03-17 18:03:23 +08:00
|
|
|
log.Debug("IndexCoord new minio chunkManager success")
|
|
|
|
i.chunkManager = chunkManager
|
2021-01-26 09:38:40 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
|
2022-09-21 16:34:51 +08:00
|
|
|
i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
|
|
|
|
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
|
2022-08-25 15:48:54 +08:00
|
|
|
if err != nil {
|
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable)
|
2021-09-22 19:31:54 +08:00
|
|
|
if err != nil {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord new task scheduler failed", zap.Error(err))
|
2021-09-22 19:31:54 +08:00
|
|
|
initErr = err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Debug("IndexCoord new task scheduler success")
|
|
|
|
|
|
|
|
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
|
|
|
|
})
|
2021-09-03 17:15:26 +08:00
|
|
|
|
2021-09-22 19:31:54 +08:00
|
|
|
log.Debug("IndexCoord init finished", zap.Error(initErr))
|
2021-09-03 17:15:26 +08:00
|
|
|
|
2021-09-22 19:31:54 +08:00
|
|
|
return initErr
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// Start starts the IndexCoord component.
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) Start() error {
|
2021-12-14 15:31:07 +08:00
|
|
|
var startErr error
|
2021-09-22 19:31:54 +08:00
|
|
|
i.startOnce.Do(func() {
|
2021-09-17 20:17:53 +08:00
|
|
|
i.loopWg.Add(1)
|
|
|
|
go i.watchNodeLoop()
|
2021-05-27 22:24:29 +08:00
|
|
|
|
2021-09-17 20:17:53 +08:00
|
|
|
i.loopWg.Add(1)
|
2022-08-25 15:48:54 +08:00
|
|
|
go i.watchFlushedSegmentLoop()
|
2021-05-27 22:24:29 +08:00
|
|
|
|
2021-09-17 20:17:53 +08:00
|
|
|
startErr = i.sched.Start()
|
2021-04-16 15:37:13 +08:00
|
|
|
|
2022-07-07 14:44:21 +08:00
|
|
|
i.indexBuilder.Start()
|
|
|
|
i.garbageCollector.Start()
|
2022-09-21 16:34:51 +08:00
|
|
|
i.handoff.Start()
|
2022-08-25 15:48:54 +08:00
|
|
|
i.flushedSegmentWatcher.Start()
|
2022-07-07 14:44:21 +08:00
|
|
|
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Healthy)
|
2021-09-17 20:17:53 +08:00
|
|
|
})
|
2021-01-26 09:38:40 +08:00
|
|
|
// Start callbacks
|
|
|
|
for _, cb := range i.startCallbacks {
|
|
|
|
cb()
|
|
|
|
}
|
2021-09-17 20:17:53 +08:00
|
|
|
|
2021-12-23 18:39:11 +08:00
|
|
|
Params.IndexCoordCfg.CreatedTime = time.Now()
|
|
|
|
Params.IndexCoordCfg.UpdatedTime = time.Now()
|
2021-09-26 17:54:06 +08:00
|
|
|
|
2022-09-29 18:35:02 +08:00
|
|
|
if i.enableActiveStandBy {
|
|
|
|
i.activateFunc = func() {
|
|
|
|
log.Info("IndexCoord switch from standby to active, reload the KV")
|
|
|
|
i.metaTable.reloadFromKV()
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Healthy)
|
2022-09-29 18:35:02 +08:00
|
|
|
}
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_StandBy)
|
2022-09-29 18:35:02 +08:00
|
|
|
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
|
|
|
|
} else {
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Healthy)
|
2022-09-29 18:35:02 +08:00
|
|
|
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
|
|
|
|
}
|
2021-01-26 09:38:40 +08:00
|
|
|
|
2021-09-17 20:17:53 +08:00
|
|
|
return startErr
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// Stop stops the IndexCoord component.
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) Stop() error {
|
2021-12-06 10:37:40 +08:00
|
|
|
// https://github.com/milvus-io/milvus/issues/12282
|
2022-10-10 15:55:22 +08:00
|
|
|
i.UpdateStateCode(commonpb.StateCode_Abnormal)
|
2021-12-06 10:37:40 +08:00
|
|
|
|
2022-01-04 10:19:18 +08:00
|
|
|
if i.loopCancel != nil {
|
|
|
|
i.loopCancel()
|
|
|
|
log.Info("cancel the loop of IndexCoord")
|
|
|
|
}
|
|
|
|
|
|
|
|
if i.sched != nil {
|
|
|
|
i.sched.Close()
|
|
|
|
log.Info("close the task scheduler of IndexCoord")
|
|
|
|
}
|
2021-09-06 17:54:41 +08:00
|
|
|
i.loopWg.Wait()
|
2022-01-04 10:19:18 +08:00
|
|
|
|
2022-07-07 14:44:21 +08:00
|
|
|
if i.indexBuilder != nil {
|
|
|
|
i.indexBuilder.Stop()
|
|
|
|
log.Info("stop the index builder of IndexCoord")
|
|
|
|
}
|
|
|
|
if i.garbageCollector != nil {
|
|
|
|
i.garbageCollector.Stop()
|
|
|
|
log.Info("stop the garbage collector of IndexCoord")
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
if i.flushedSegmentWatcher != nil {
|
|
|
|
i.flushedSegmentWatcher.Stop()
|
|
|
|
log.Info("stop the flushed segment watcher")
|
|
|
|
}
|
2022-07-07 14:44:21 +08:00
|
|
|
|
2021-01-26 09:38:40 +08:00
|
|
|
for _, cb := range i.closeCallbacks {
|
|
|
|
cb()
|
|
|
|
}
|
2021-11-16 22:31:14 +08:00
|
|
|
i.session.Revoke(time.Second)
|
2021-11-26 11:39:16 +08:00
|
|
|
|
2021-01-26 09:38:40 +08:00
|
|
|
return nil
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2021-12-29 14:35:21 +08:00
|
|
|
func (i *IndexCoord) SetEtcdClient(etcdClient *clientv3.Client) {
|
|
|
|
i.etcdCli = etcdClient
|
|
|
|
}
|
|
|
|
|
2022-05-31 16:36:03 +08:00
|
|
|
// SetDataCoord sets data coordinator's client
|
|
|
|
func (i *IndexCoord) SetDataCoord(dataCoord types.DataCoord) error {
|
|
|
|
if dataCoord == nil {
|
|
|
|
return errors.New("null DataCoord interface")
|
|
|
|
}
|
|
|
|
|
|
|
|
i.dataCoordClient = dataCoord
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// SetRootCoord sets data coordinator's client
|
|
|
|
func (i *IndexCoord) SetRootCoord(rootCoord types.RootCoord) error {
|
|
|
|
if rootCoord == nil {
|
|
|
|
return errors.New("null RootCoord interface")
|
|
|
|
}
|
|
|
|
|
|
|
|
i.rootCoordClient = rootCoord
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// UpdateStateCode updates the component state of IndexCoord.
|
2022-10-10 15:55:22 +08:00
|
|
|
func (i *IndexCoord) UpdateStateCode(code commonpb.StateCode) {
|
2021-06-04 16:29:35 +08:00
|
|
|
i.stateCode.Store(code)
|
2021-01-29 17:08:31 +08:00
|
|
|
}
|
|
|
|
|
2021-08-19 10:28:10 +08:00
|
|
|
func (i *IndexCoord) isHealthy() bool {
|
2022-10-10 15:55:22 +08:00
|
|
|
code := i.stateCode.Load().(commonpb.StateCode)
|
|
|
|
return code == commonpb.StateCode_Healthy
|
2021-08-19 10:28:10 +08:00
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// GetComponentStates gets the component states of IndexCoord.
|
2022-10-10 15:55:22 +08:00
|
|
|
func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
2021-06-21 17:28:03 +08:00
|
|
|
log.Debug("get IndexCoord component states ...")
|
2021-11-19 13:57:12 +08:00
|
|
|
|
|
|
|
nodeID := common.NotRegisteredID
|
|
|
|
if i.session != nil && i.session.Registered() {
|
|
|
|
nodeID = i.session.ServerID
|
|
|
|
}
|
|
|
|
|
2022-10-10 15:55:22 +08:00
|
|
|
stateInfo := &milvuspb.ComponentInfo{
|
2021-11-19 13:57:12 +08:00
|
|
|
NodeID: nodeID,
|
2021-06-21 17:28:03 +08:00
|
|
|
Role: "IndexCoord",
|
2022-10-10 15:55:22 +08:00
|
|
|
StateCode: i.stateCode.Load().(commonpb.StateCode),
|
2021-01-26 19:24:09 +08:00
|
|
|
}
|
|
|
|
|
2022-10-10 15:55:22 +08:00
|
|
|
ret := &milvuspb.ComponentStates{
|
2021-01-26 19:24:09 +08:00
|
|
|
State: stateInfo,
|
|
|
|
SubcomponentStates: nil, // todo add subcomponents states
|
|
|
|
Status: &commonpb.Status{
|
2021-03-10 22:06:22 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
2021-01-26 19:24:09 +08:00
|
|
|
},
|
|
|
|
}
|
2021-07-14 14:15:55 +08:00
|
|
|
log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo))
|
2021-01-26 19:24:09 +08:00
|
|
|
return ret, nil
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// GetStatisticsChannel gets the statistics channel of IndexCoord.
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
log.Debug("get IndexCoord statistics channel ...")
|
2021-02-26 17:44:24 +08:00
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
Status: &commonpb.Status{
|
2021-03-10 22:06:22 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
2021-02-26 17:44:24 +08:00
|
|
|
Reason: "",
|
|
|
|
},
|
|
|
|
Value: "",
|
|
|
|
}, nil
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// CreateIndex create an index on collection.
|
|
|
|
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
|
|
|
|
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
|
|
|
|
// indexBuilder will find this task and assign it to IndexNode for execution.
|
|
|
|
func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
|
2021-12-02 23:11:35 +08:00
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &commonpb.Status{
|
2022-05-31 16:36:03 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-08-25 15:48:54 +08:00
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
|
|
|
}, nil
|
2022-05-31 16:36:03 +08:00
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Debug("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID),
|
|
|
|
zap.String("IndexName", req.IndexName), zap.Int64("fieldID", req.FieldID),
|
|
|
|
zap.Any("TypeParams", req.TypeParams),
|
|
|
|
zap.Any("IndexParams", req.IndexParams))
|
2022-05-31 16:36:03 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
ret := &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
}
|
2022-05-31 16:36:03 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
if !i.metaTable.CanCreateIndex(req) {
|
|
|
|
ret.Reason = "CreateIndex failed: index already exist, but parameters are inconsistent"
|
|
|
|
return ret, nil
|
2021-05-27 22:24:29 +08:00
|
|
|
}
|
2022-05-31 16:36:03 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
t := &CreateIndexTask{
|
2021-02-23 09:58:06 +08:00
|
|
|
BaseTask: BaseTask{
|
|
|
|
ctx: ctx,
|
|
|
|
done: make(chan error),
|
|
|
|
table: i.metaTable,
|
|
|
|
},
|
2022-08-25 15:48:54 +08:00
|
|
|
dataCoordClient: i.dataCoordClient,
|
|
|
|
rootCoordClient: i.rootCoordClient,
|
|
|
|
indexCoordClient: i,
|
|
|
|
req: req,
|
2021-02-23 09:58:06 +08:00
|
|
|
}
|
2021-01-26 09:38:40 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
err := i.sched.IndexAddQueue.Enqueue(t)
|
2021-01-26 09:38:40 +08:00
|
|
|
if err != nil {
|
2022-08-25 15:48:54 +08:00
|
|
|
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
ret.Reason = err.Error()
|
2021-01-26 09:38:40 +08:00
|
|
|
return ret, nil
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Debug("IndexCoord create index enqueue successfully", zap.Int64("IndexID", t.indexID))
|
2021-01-20 18:26:20 +08:00
|
|
|
|
2021-01-26 09:38:40 +08:00
|
|
|
err = t.WaitToFinish()
|
|
|
|
if err != nil {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Error("IndexCoord scheduler creating index task fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.Int64("fieldID", req.FieldID), zap.String("indexName", req.IndexName), zap.Error(err))
|
|
|
|
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
ret.Reason = err.Error()
|
2021-01-26 09:38:40 +08:00
|
|
|
return ret, nil
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
|
|
|
|
ret.ErrorCode = commonpb.ErrorCode_Success
|
2021-01-26 09:38:40 +08:00
|
|
|
return ret, nil
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// GetIndexState gets the index state of the index name in the request from Proxy.
|
|
|
|
func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
|
|
|
|
log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName))
|
|
|
|
|
2021-12-09 11:43:09 +08:00
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &indexpb.GetIndexStateResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
|
|
|
|
if len(indexID2CreateTs) == 0 {
|
|
|
|
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
|
|
|
|
log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
|
|
|
|
return &indexpb.GetIndexStateResponse{
|
2021-12-09 11:43:09 +08:00
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: errMsg,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
ret := &indexpb.GetIndexStateResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
State: commonpb.IndexState_Finished,
|
|
|
|
}
|
|
|
|
|
|
|
|
for indexID, createTs := range indexID2CreateTs {
|
2022-09-30 10:56:54 +08:00
|
|
|
indexStates, _ := i.metaTable.GetIndexStates(indexID, createTs)
|
2022-08-25 15:48:54 +08:00
|
|
|
for _, state := range indexStates {
|
|
|
|
if state.state != commonpb.IndexState_Finished {
|
|
|
|
ret.State = state.state
|
|
|
|
ret.FailReason = state.failReason
|
|
|
|
log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
|
|
|
|
return ret, nil
|
|
|
|
}
|
2021-01-20 15:02:23 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-30 14:32:19 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
|
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
|
|
|
|
log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName))
|
|
|
|
|
|
|
|
if !i.isHealthy() {
|
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &indexpb.GetSegmentIndexStateResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ret := &indexpb.GetSegmentIndexStateResponse{
|
2021-01-19 18:32:57 +08:00
|
|
|
Status: &commonpb.Status{
|
2021-03-10 22:06:22 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
2021-01-19 18:32:57 +08:00
|
|
|
},
|
2022-08-25 15:48:54 +08:00
|
|
|
States: make([]*indexpb.SegmentIndexState, 0),
|
|
|
|
}
|
|
|
|
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
|
2022-09-15 15:44:31 +08:00
|
|
|
if len(indexID2CreateTs) == 0 {
|
2022-08-25 15:48:54 +08:00
|
|
|
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
|
|
|
|
log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
|
|
|
|
return &indexpb.GetSegmentIndexStateResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: errMsg,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
2022-09-15 15:44:31 +08:00
|
|
|
for _, segID := range req.SegmentIDs {
|
|
|
|
state := i.metaTable.GetSegmentIndexState(segID)
|
|
|
|
ret.States = append(ret.States, &indexpb.SegmentIndexState{
|
|
|
|
SegmentID: segID,
|
|
|
|
State: state.state,
|
|
|
|
FailReason: state.failReason,
|
|
|
|
})
|
2021-01-19 18:32:57 +08:00
|
|
|
}
|
|
|
|
return ret, nil
|
2021-01-15 14:38:36 +08:00
|
|
|
}
|
|
|
|
|
2022-09-30 10:56:54 +08:00
|
|
|
// completeIndexInfo get the building index progress and index state
|
2022-09-27 19:12:54 +08:00
|
|
|
func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error {
|
|
|
|
collectionID := indexInfo.CollectionID
|
|
|
|
indexName := indexInfo.IndexName
|
|
|
|
log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID),
|
|
|
|
zap.String("indexName", indexName))
|
|
|
|
|
2022-09-30 10:56:54 +08:00
|
|
|
calculateTotalRow := func() (int64, error) {
|
|
|
|
totalRows := int64(0)
|
|
|
|
flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionID: -1,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return totalRows, err
|
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
|
|
|
SegmentIDs: flushSegments.Segments,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return totalRows, err
|
|
|
|
}
|
2022-09-27 19:12:54 +08:00
|
|
|
|
2022-09-30 10:56:54 +08:00
|
|
|
for _, seg := range resp.Infos {
|
|
|
|
if seg.State == commonpb.SegmentState_Flushed {
|
|
|
|
totalRows += seg.NumOfRows
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return totalRows, nil
|
2022-09-27 19:12:54 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
indexID2CreateTs := i.metaTable.GetIndexIDByName(collectionID, indexName)
|
|
|
|
if len(indexID2CreateTs) < 1 {
|
|
|
|
log.Error("there is no index on collection", zap.Int64("collectionID", collectionID), zap.String("indexName", indexName))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-09-30 10:56:54 +08:00
|
|
|
var indexID int64
|
|
|
|
var createTs uint64
|
|
|
|
// the size of `indexID2CreateTs` map is one
|
|
|
|
// and we need to get key and value through the `for` statement
|
|
|
|
for k, v := range indexID2CreateTs {
|
|
|
|
indexID = k
|
|
|
|
createTs = v
|
2022-09-27 19:12:54 +08:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2022-09-30 10:56:54 +08:00
|
|
|
indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs)
|
|
|
|
allCnt := len(indexStates)
|
|
|
|
switch {
|
|
|
|
case indexStateCnt.Failed > 0:
|
|
|
|
indexInfo.State = commonpb.IndexState_Failed
|
|
|
|
indexInfo.IndexStateFailReason = indexStateCnt.FailReason
|
|
|
|
case indexStateCnt.Finished == allCnt:
|
|
|
|
indexInfo.State = commonpb.IndexState_Finished
|
|
|
|
default:
|
|
|
|
indexInfo.State = commonpb.IndexState_InProgress
|
|
|
|
indexInfo.IndexedRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
|
|
|
|
totalRow, err := calculateTotalRow()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2022-09-27 19:12:54 +08:00
|
|
|
}
|
2022-09-30 10:56:54 +08:00
|
|
|
indexInfo.TotalRows = totalRow
|
2022-09-27 19:12:54 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID),
|
2022-09-30 10:56:54 +08:00
|
|
|
zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
|
|
|
|
zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason))
|
2022-09-27 19:12:54 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// GetIndexBuildProgress get the index building progress by num rows.
|
|
|
|
func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
|
|
|
|
log.Info("IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName))
|
2021-12-07 21:07:22 +08:00
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &indexpb.GetIndexBuildProgressResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
|
|
|
},
|
2021-12-07 21:07:22 +08:00
|
|
|
}, nil
|
|
|
|
}
|
2021-02-23 11:57:18 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
|
|
|
|
CollectionID: req.CollectionID,
|
|
|
|
PartitionID: -1,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return &indexpb.GetIndexBuildProgressResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}, err
|
2021-04-16 15:37:13 +08:00
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
|
|
|
|
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
|
|
|
SegmentIDs: flushSegments.Segments,
|
|
|
|
})
|
2021-02-23 11:57:18 +08:00
|
|
|
if err != nil {
|
2022-08-25 15:48:54 +08:00
|
|
|
return &indexpb.GetIndexBuildProgressResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}, err
|
2021-02-23 11:57:18 +08:00
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
totalRows, indexRows := int64(0), int64(0)
|
|
|
|
|
|
|
|
for _, seg := range resp.Infos {
|
|
|
|
totalRows += seg.NumOfRows
|
2022-07-07 14:44:21 +08:00
|
|
|
}
|
2021-02-23 11:57:18 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
|
|
|
|
if len(indexID2CreateTs) < 1 {
|
|
|
|
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
|
|
|
|
log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
|
|
|
|
return &indexpb.GetIndexBuildProgressResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: errMsg,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
2021-02-23 11:57:18 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
for indexID, createTs := range indexID2CreateTs {
|
|
|
|
indexRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
|
|
|
|
break
|
|
|
|
}
|
2021-02-23 11:57:18 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Debug("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
|
|
|
|
zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos)))
|
|
|
|
return &indexpb.GetIndexBuildProgressResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
IndexedRows: indexRows,
|
|
|
|
TotalRows: totalRows,
|
|
|
|
}, nil
|
|
|
|
}
|
2022-06-17 18:08:12 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// DropIndex deletes indexes based on IndexName. One IndexName corresponds to the index of an entire column. A column is
|
|
|
|
// divided into many segments, and each segment corresponds to an IndexBuildID. IndexCoord uses IndexBuildID to record
|
|
|
|
// index tasks.
|
|
|
|
func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
|
|
|
log.Info("IndexCoord DropIndex", zap.Int64("collectionID", req.CollectionID),
|
2022-09-23 09:36:51 +08:00
|
|
|
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName))
|
2022-06-17 18:08:12 +08:00
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
2022-06-17 18:08:12 +08:00
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-08-25 15:48:54 +08:00
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
2022-06-17 18:08:12 +08:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ret := &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
}
|
2022-06-23 19:22:15 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
|
|
|
|
if len(indexID2CreateTs) == 0 {
|
|
|
|
log.Warn(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName))
|
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
indexIDs := make([]UniqueID, 0)
|
|
|
|
for indexID := range indexID2CreateTs {
|
|
|
|
indexIDs = append(indexIDs, indexID)
|
|
|
|
}
|
2022-09-23 09:36:51 +08:00
|
|
|
if len(req.GetPartitionIDs()) == 0 {
|
|
|
|
// drop collection index
|
|
|
|
err := i.metaTable.MarkIndexAsDeleted(req.CollectionID, indexIDs)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.Error(err))
|
|
|
|
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
ret.Reason = err.Error()
|
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
err := i.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
|
|
|
|
for _, partitionID := range req.PartitionIDs {
|
|
|
|
if segIndex.CollectionID == req.CollectionID && segIndex.PartitionID == partitionID {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName), zap.Error(err))
|
|
|
|
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
ret.Reason = err.Error()
|
|
|
|
return ret, nil
|
|
|
|
}
|
2022-06-17 18:08:12 +08:00
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Info("IndexCoord DropIndex success", zap.Int64("collID", req.CollectionID),
|
2022-09-23 09:36:51 +08:00
|
|
|
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName),
|
|
|
|
zap.Int64s("indexIDs", indexIDs))
|
2022-06-17 18:08:12 +08:00
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// TODO @xiaocai2333: drop index on the segments when drop partition. (need?)
|
|
|
|
|
|
|
|
// GetIndexInfos gets the index file paths from IndexCoord.
|
|
|
|
func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
|
2021-12-17 23:10:42 +08:00
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &indexpb.GetIndexInfoResponse{
|
2021-12-17 23:10:42 +08:00
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-08-25 15:48:54 +08:00
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
2021-12-17 23:10:42 +08:00
|
|
|
},
|
2022-08-25 15:48:54 +08:00
|
|
|
SegmentInfo: nil,
|
2021-12-17 23:10:42 +08:00
|
|
|
}, nil
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
ret := &indexpb.GetIndexInfoResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
SegmentInfo: map[int64]*indexpb.SegmentInfo{},
|
|
|
|
}
|
2021-01-15 14:38:36 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
for _, segID := range req.SegmentIDs {
|
|
|
|
segIdxes := i.metaTable.GetSegmentIndexes(segID)
|
|
|
|
ret.SegmentInfo[segID] = &indexpb.SegmentInfo{
|
|
|
|
CollectionID: req.CollectionID,
|
|
|
|
SegmentID: segID,
|
|
|
|
EnableIndex: false,
|
|
|
|
IndexInfos: make([]*indexpb.IndexFilePathInfo, 0),
|
2021-02-07 17:02:13 +08:00
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
if len(segIdxes) != 0 {
|
|
|
|
ret.SegmentInfo[segID].EnableIndex = true
|
|
|
|
for _, segIdx := range segIdxes {
|
|
|
|
ret.SegmentInfo[segID].IndexInfos = append(ret.SegmentInfo[segID].IndexInfos,
|
|
|
|
&indexpb.IndexFilePathInfo{
|
|
|
|
SegmentID: segID,
|
|
|
|
FieldID: i.metaTable.GetFieldIDByIndexID(segIdx.CollectionID, segIdx.IndexID),
|
|
|
|
IndexID: segIdx.IndexID,
|
|
|
|
BuildID: segIdx.BuildID,
|
|
|
|
IndexName: i.metaTable.GetIndexNameByID(segIdx.CollectionID, segIdx.IndexID),
|
|
|
|
IndexParams: i.metaTable.GetIndexParams(segIdx.CollectionID, segIdx.IndexID),
|
|
|
|
IndexFilePaths: segIdx.IndexFilePaths,
|
|
|
|
SerializedSize: segIdx.IndexSize,
|
2022-09-21 20:16:51 +08:00
|
|
|
IndexVersion: segIdx.IndexVersion,
|
2022-08-25 15:48:54 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DescribeIndex describe the index info of the collection.
|
|
|
|
func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
|
|
|
|
if !i.isHealthy() {
|
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
|
|
|
return &indexpb.DescribeIndexResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
|
|
|
},
|
|
|
|
IndexInfos: nil,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
indexes := i.metaTable.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
|
|
|
|
if len(indexes) == 0 {
|
|
|
|
return &indexpb.DescribeIndexResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_IndexNotExist,
|
|
|
|
Reason: "index not exist",
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
indexInfos := make([]*indexpb.IndexInfo, 0)
|
|
|
|
for _, index := range indexes {
|
2022-09-27 19:12:54 +08:00
|
|
|
indexInfo := &indexpb.IndexInfo{
|
2022-10-08 15:38:58 +08:00
|
|
|
CollectionID: index.CollectionID,
|
|
|
|
FieldID: index.FieldID,
|
|
|
|
IndexName: index.IndexName,
|
|
|
|
TypeParams: index.TypeParams,
|
|
|
|
IndexParams: index.IndexParams,
|
|
|
|
IsAutoIndex: index.IsAutoIndex,
|
|
|
|
UserIndexParams: index.UserIndexParams,
|
2022-09-27 19:12:54 +08:00
|
|
|
}
|
|
|
|
if err := i.completeIndexInfo(ctx, indexInfo); err != nil {
|
|
|
|
log.Error("IndexCoord describe index fail", zap.Int64("collectionID", req.CollectionID),
|
|
|
|
zap.String("indexName", req.IndexName), zap.Error(err))
|
|
|
|
return &indexpb.DescribeIndexResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
indexInfos = append(indexInfos, indexInfo)
|
2021-01-26 09:38:40 +08:00
|
|
|
}
|
2021-01-19 18:32:57 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
return &indexpb.DescribeIndexResponse{
|
2021-01-26 09:38:40 +08:00
|
|
|
Status: &commonpb.Status{
|
2021-03-10 22:06:22 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
2021-01-26 09:38:40 +08:00
|
|
|
},
|
2022-08-25 15:48:54 +08:00
|
|
|
IndexInfos: indexInfos,
|
|
|
|
}, nil
|
2021-01-26 09:38:40 +08:00
|
|
|
}
|
2021-01-19 18:32:57 +08:00
|
|
|
|
2022-08-25 15:48:54 +08:00
|
|
|
// ShowConfigurations returns the configurations of indexCoord matching req.Pattern
|
2022-08-12 13:20:39 +08:00
|
|
|
func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
|
|
|
|
log.Debug("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
|
|
|
|
if !i.isHealthy() {
|
|
|
|
log.Warn("IndexCoord.ShowConfigurations failed",
|
2022-08-25 15:48:54 +08:00
|
|
|
zap.Int64("nodeId", i.serverID),
|
2022-08-12 13:20:39 +08:00
|
|
|
zap.String("req", req.Pattern),
|
|
|
|
zap.Error(errIndexCoordIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
|
|
|
|
|
|
|
|
return &internalpb.ShowConfigurationsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgIndexCoordIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
},
|
|
|
|
Configuations: nil,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return getComponentConfigurations(ctx, req), nil
|
|
|
|
}
|
|
|
|
|
2021-09-28 21:58:33 +08:00
|
|
|
// GetMetrics gets the metrics info of IndexCoord.
|
2021-08-19 10:28:10 +08:00
|
|
|
func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Debug("IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request))
|
2021-08-19 10:28:10 +08:00
|
|
|
|
|
|
|
if !i.isHealthy() {
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
|
2021-08-19 10:28:10 +08:00
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-08-25 15:48:54 +08:00
|
|
|
Reason: msgIndexCoordIsUnhealthy(i.serverID),
|
2021-08-19 10:28:10 +08:00
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
|
|
|
if err != nil {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord.GetMetrics failed to parse metric type",
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.Int64("node id", i.session.ServerID),
|
2021-08-19 10:28:10 +08:00
|
|
|
zap.String("req", req.Request),
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("IndexCoord.GetMetrics",
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.String("metric type", metricType))
|
2021-08-19 10:28:10 +08:00
|
|
|
|
|
|
|
if metricType == metricsinfo.SystemInfoMetrics {
|
2021-09-03 17:15:26 +08:00
|
|
|
ret, err := i.metricsCacheManager.GetSystemInfoMetrics()
|
|
|
|
if err == nil && ret != nil {
|
|
|
|
return ret, nil
|
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
log.Error("failed to get system info metrics from cache, recompute instead",
|
2021-09-03 17:15:26 +08:00
|
|
|
zap.Error(err))
|
|
|
|
|
2021-08-19 10:28:10 +08:00
|
|
|
metrics, err := getSystemInfoMetrics(ctx, req, i)
|
|
|
|
|
|
|
|
log.Debug("IndexCoord.GetMetrics",
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.Int64("node id", i.session.ServerID),
|
2021-08-19 10:28:10 +08:00
|
|
|
zap.String("req", req.Request),
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.String("metric type", metricType),
|
|
|
|
zap.String("metrics", metrics.Response), // TODO(dragondriver): necessary? may be very large
|
2021-08-19 10:28:10 +08:00
|
|
|
zap.Error(err))
|
|
|
|
|
2021-09-03 17:15:26 +08:00
|
|
|
i.metricsCacheManager.UpdateSystemInfoMetrics(metrics)
|
|
|
|
|
2021-12-01 22:17:46 +08:00
|
|
|
return metrics, nil
|
2021-08-19 10:28:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("IndexCoord.GetMetrics failed, request metric type is not implemented yet",
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.Int64("node id", i.session.ServerID),
|
2021-08-19 10:28:10 +08:00
|
|
|
zap.String("req", req.Request),
|
2021-12-17 22:17:03 +08:00
|
|
|
zap.String("metric type", metricType))
|
2021-08-19 10:28:10 +08:00
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: metricsinfo.MsgUnimplementedMetric,
|
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-09-15 22:09:50 +08:00
|
|
|
// watchNodeLoop is used to monitor IndexNode going online and offline.
|
2021-10-14 19:20:35 +08:00
|
|
|
// fix datarace in unittest
|
|
|
|
// startWatchService will only be invoked at start procedure
|
|
|
|
// otherwise, remove the annotation and add atomic protection
|
2022-08-25 15:48:54 +08:00
|
|
|
//
|
|
|
|
//go:norace
|
2021-06-21 17:28:03 +08:00
|
|
|
func (i *IndexCoord) watchNodeLoop() {
|
2021-05-27 22:24:29 +08:00
|
|
|
ctx, cancel := context.WithCancel(i.loopCtx)
|
|
|
|
|
|
|
|
defer cancel()
|
|
|
|
defer i.loopWg.Done()
|
2021-06-21 17:28:03 +08:00
|
|
|
log.Debug("IndexCoord watchNodeLoop start")
|
2021-05-27 22:24:29 +08:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2021-10-14 19:20:35 +08:00
|
|
|
case event, ok := <-i.eventChan:
|
|
|
|
if !ok {
|
2022-02-15 15:07:48 +08:00
|
|
|
// ErrCompacted is handled inside SessionWatcher
|
|
|
|
log.Error("Session Watcher channel closed", zap.Int64("server id", i.session.ServerID))
|
|
|
|
go i.Stop()
|
|
|
|
if i.session.TriggerKill {
|
2022-03-17 17:17:22 +08:00
|
|
|
if p, err := os.FindProcess(os.Getpid()); err == nil {
|
|
|
|
p.Signal(syscall.SIGINT)
|
|
|
|
}
|
2022-02-15 15:07:48 +08:00
|
|
|
}
|
2021-10-14 19:20:35 +08:00
|
|
|
return
|
|
|
|
}
|
2022-09-27 22:38:55 +08:00
|
|
|
if Params.IndexCoordCfg.BindIndexNodeMode {
|
|
|
|
continue
|
|
|
|
}
|
2021-07-14 14:15:55 +08:00
|
|
|
log.Debug("IndexCoord watchNodeLoop event updated")
|
2021-05-27 22:24:29 +08:00
|
|
|
switch event.EventType {
|
|
|
|
case sessionutil.SessionAddEvent:
|
|
|
|
serverID := event.Session.ServerID
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID),
|
|
|
|
zap.String("address", event.Session.Address))
|
2021-08-13 18:12:08 +08:00
|
|
|
go func() {
|
|
|
|
err := i.nodeManager.AddNode(serverID, event.Session.Address)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord", zap.Any("Add IndexNode err", err))
|
|
|
|
}
|
|
|
|
}()
|
2021-09-03 17:15:26 +08:00
|
|
|
i.metricsCacheManager.InvalidateSystemInfoMetrics()
|
2021-05-27 22:24:29 +08:00
|
|
|
case sessionutil.SessionDelEvent:
|
|
|
|
serverID := event.Session.ServerID
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID))
|
2021-07-14 14:15:55 +08:00
|
|
|
i.nodeManager.RemoveNode(serverID)
|
2022-07-07 14:44:21 +08:00
|
|
|
// remove tasks on nodeID
|
|
|
|
i.indexBuilder.nodeDown(serverID)
|
2021-09-03 17:15:26 +08:00
|
|
|
i.metricsCacheManager.InvalidateSystemInfoMetrics()
|
2021-05-27 22:24:29 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-07 14:44:21 +08:00
|
|
|
func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID, segIDs []UniqueID) error {
|
2022-06-15 21:38:10 +08:00
|
|
|
// IndexCoord use buildID instead of taskID.
|
2022-07-07 14:44:21 +08:00
|
|
|
log.Info("try to acquire segment reference lock", zap.Int64("buildID", buildID),
|
|
|
|
zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs))
|
2022-05-31 16:36:03 +08:00
|
|
|
status, err := i.dataCoordClient.AcquireSegmentLock(ctx, &datapb.AcquireSegmentLockRequest{
|
2022-06-15 21:38:10 +08:00
|
|
|
TaskID: buildID,
|
2022-07-07 14:44:21 +08:00
|
|
|
NodeID: nodeID,
|
2022-05-31 16:36:03 +08:00
|
|
|
SegmentIDs: segIDs,
|
|
|
|
})
|
|
|
|
if err != nil {
|
2022-07-07 14:44:21 +08:00
|
|
|
log.Error("IndexCoord try to acquire segment reference lock failed", zap.Int64("buildID", buildID),
|
|
|
|
zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs), zap.Error(err))
|
2022-05-31 16:36:03 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
if status.ErrorCode != commonpb.ErrorCode_Success {
|
2022-07-07 14:44:21 +08:00
|
|
|
log.Error("IndexCoord try to acquire segment reference lock failed", zap.Int64("buildID", buildID),
|
|
|
|
zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs), zap.Error(errors.New(status.Reason)))
|
2022-05-31 16:36:03 +08:00
|
|
|
return errors.New(status.Reason)
|
|
|
|
}
|
2022-07-07 14:44:21 +08:00
|
|
|
log.Info("try to acquire segment reference lock success", zap.Int64("buildID", buildID),
|
|
|
|
zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs))
|
2022-05-31 16:36:03 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-07-07 14:44:21 +08:00
|
|
|
func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID) error {
|
2022-05-31 16:36:03 +08:00
|
|
|
releaseLock := func() error {
|
|
|
|
status, err := i.dataCoordClient.ReleaseSegmentLock(ctx, &datapb.ReleaseSegmentLockRequest{
|
2022-06-15 21:38:10 +08:00
|
|
|
TaskID: buildID,
|
2022-07-07 14:44:21 +08:00
|
|
|
NodeID: nodeID,
|
2022-05-31 16:36:03 +08:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
return errors.New(status.Reason)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
err := retry.Do(ctx, releaseLock, retry.Attempts(100))
|
|
|
|
if err != nil {
|
2022-07-07 14:44:21 +08:00
|
|
|
log.Error("IndexCoord try to release segment reference lock failed", zap.Int64("buildID", buildID),
|
|
|
|
zap.Int64("nodeID", nodeID), zap.Error(err))
|
2022-05-31 16:36:03 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-06 18:29:54 +08:00
|
|
|
// assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within
|
|
|
|
// the interval, it is considered that the task sending failed.
|
2022-08-25 15:48:54 +08:00
|
|
|
func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateJobRequest) error {
|
2021-09-14 10:41:21 +08:00
|
|
|
ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval)
|
2021-08-19 14:32:11 +08:00
|
|
|
defer cancel()
|
2022-08-25 15:48:54 +08:00
|
|
|
resp, err := builderClient.CreateJob(ctx, req)
|
2021-08-19 14:32:11 +08:00
|
|
|
if err != nil {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
|
2022-07-07 14:44:21 +08:00
|
|
|
return err
|
2021-08-19 14:32:11 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
2021-09-26 21:23:57 +08:00
|
|
|
log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
|
2022-07-07 14:44:21 +08:00
|
|
|
return errors.New(resp.Reason)
|
2021-06-11 16:52:09 +08:00
|
|
|
}
|
2022-07-07 14:44:21 +08:00
|
|
|
return nil
|
2021-05-27 22:24:29 +08:00
|
|
|
}
|
2022-08-25 15:48:54 +08:00
|
|
|
|
|
|
|
func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, UniqueID, error) {
|
|
|
|
log.Info("create index for flushed segment", zap.Int64("collID", segIdx.CollectionID),
|
|
|
|
zap.Int64("segID", segIdx.SegmentID), zap.Int64("numRows", segIdx.NumRows))
|
2022-09-09 15:52:35 +08:00
|
|
|
//if segIdx.NumRows < Params.IndexCoordCfg.MinSegmentNumRowsToEnableIndex {
|
|
|
|
// log.Debug("no need to build index", zap.Int64("collID", segIdx.CollectionID),
|
|
|
|
// zap.Int64("segID", segIdx.SegmentID), zap.Int64("numRows", segIdx.NumRows))
|
|
|
|
// return false, 0, nil
|
|
|
|
//}
|
2022-08-25 15:48:54 +08:00
|
|
|
|
|
|
|
hasIndex, indexBuildID := i.metaTable.HasSameIndex(segIdx.SegmentID, segIdx.IndexID)
|
|
|
|
if hasIndex {
|
|
|
|
log.Debug("IndexCoord has same index", zap.Int64("buildID", indexBuildID), zap.Int64("segmentID", segIdx.SegmentID))
|
|
|
|
return true, indexBuildID, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
t := &IndexAddTask{
|
|
|
|
BaseTask: BaseTask{
|
|
|
|
ctx: i.loopCtx,
|
|
|
|
done: make(chan error),
|
|
|
|
table: i.metaTable,
|
|
|
|
},
|
|
|
|
segmentIndex: segIdx,
|
|
|
|
rootcoordClient: i.rootCoordClient,
|
|
|
|
}
|
|
|
|
|
|
|
|
metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()
|
|
|
|
|
|
|
|
err := i.sched.IndexAddQueue.Enqueue(t)
|
|
|
|
if err != nil {
|
|
|
|
metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
|
|
|
log.Error("IndexCoord createIndex enqueue failed", zap.Int64("collID", segIdx.CollectionID),
|
|
|
|
zap.Int64("segID", segIdx.SegmentID), zap.Error(err))
|
|
|
|
return false, 0, err
|
|
|
|
}
|
|
|
|
log.Debug("IndexCoord createIndex Enqueue successfully", zap.Int64("collID", segIdx.CollectionID),
|
|
|
|
zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID))
|
|
|
|
|
|
|
|
err = t.WaitToFinish()
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord scheduler index task failed", zap.Int64("buildID", t.segmentIndex.BuildID))
|
|
|
|
metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
|
|
|
return false, 0, err
|
|
|
|
}
|
|
|
|
metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc()
|
|
|
|
|
|
|
|
return false, t.segmentIndex.BuildID, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *IndexCoord) watchFlushedSegmentLoop() {
|
|
|
|
log.Info("IndexCoord start watching flushed segments...")
|
|
|
|
defer i.loopWg.Done()
|
|
|
|
|
|
|
|
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-i.loopCtx.Done():
|
|
|
|
log.Warn("IndexCoord context done, exit...")
|
|
|
|
return
|
|
|
|
case resp, ok := <-watchChan:
|
|
|
|
if !ok {
|
|
|
|
log.Warn("IndexCoord watch flush segments loop failed because watch channel closed")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err := resp.Err(); err != nil {
|
|
|
|
log.Warn("IndexCoord watchFlushedSegmentLoo receive etcd compacted error")
|
|
|
|
if err == v3rpc.ErrCompacted {
|
|
|
|
err = i.flushedSegmentWatcher.reloadFromKV()
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Constructing flushed segment watcher fails when etcd has a compaction error",
|
|
|
|
zap.String("etcd error", err.Error()), zap.Error(err))
|
|
|
|
panic("failed to handle etcd request, exit..")
|
|
|
|
}
|
|
|
|
i.loopWg.Add(1)
|
|
|
|
go i.watchFlushedSegmentLoop()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Error("received error event from flushed segment watcher",
|
|
|
|
zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
|
|
|
|
panic("failed to handle etcd request, exit..")
|
|
|
|
}
|
|
|
|
events := resp.Events
|
|
|
|
for _, event := range events {
|
|
|
|
switch event.Type {
|
|
|
|
case mvccpb.PUT:
|
2022-09-09 15:52:35 +08:00
|
|
|
segmentID, err := strconv.ParseInt(string(event.Kv.Value), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord watch flushed segment, but parse segmentID fail",
|
|
|
|
zap.String("event.Value", string(event.Kv.Value)), zap.Error(err))
|
2022-08-25 15:48:54 +08:00
|
|
|
continue
|
|
|
|
}
|
2022-09-09 15:52:35 +08:00
|
|
|
log.Debug("watchFlushedSegmentLoop watch event", zap.Int64("segID", segmentID))
|
|
|
|
i.flushedSegmentWatcher.enqueueInternalTask(segmentID)
|
2022-08-25 15:48:54 +08:00
|
|
|
case mvccpb.DELETE:
|
|
|
|
log.Debug("the segment info has been deleted", zap.String("key", string(event.Kv.Key)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-09-21 16:34:51 +08:00
|
|
|
|
|
|
|
func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*datapb.SegmentInfo, error) {
|
|
|
|
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
|
|
|
SegmentIDs: []int64{segmentID},
|
|
|
|
IncludeUnHealthy: false,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
|
|
|
|
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID),
|
|
|
|
zap.String("fail reason", resp.Status.GetReason()))
|
|
|
|
if resp.Status.GetReason() == msgSegmentNotFound(segmentID) {
|
|
|
|
return nil, errSegmentNotFound(segmentID)
|
|
|
|
}
|
|
|
|
return nil, errors.New(resp.Status.GetReason())
|
|
|
|
}
|
|
|
|
for _, info := range resp.Infos {
|
|
|
|
if info.ID == segmentID {
|
|
|
|
return info, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
errMsg := msgSegmentNotFound(segmentID)
|
|
|
|
log.Error(errMsg)
|
|
|
|
return nil, errSegmentNotFound(segmentID)
|
|
|
|
}
|