mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
Add ctx parameter for tsafe pkg & NewDelegator method (#27877)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
977e508a0f
commit
13877a07ff
@ -618,11 +618,11 @@ func (sd *shardDelegator) Close() {
|
||||
}
|
||||
|
||||
// NewShardDelegator creates a new ShardDelegator instance with all fields initialized.
|
||||
func NewShardDelegator(collectionID UniqueID, replicaID UniqueID, channel string, version int64,
|
||||
func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID UniqueID, channel string, version int64,
|
||||
workerManager cluster.Manager, manager *segments.Manager, tsafeManager tsafe.Manager, loader segments.Loader,
|
||||
factory msgstream.Factory, startTs uint64,
|
||||
) (ShardDelegator, error) {
|
||||
log := log.With(zap.Int64("collectionID", collectionID),
|
||||
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("replicaID", replicaID),
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("version", version),
|
||||
|
@ -130,7 +130,7 @@ func (s *DelegatorDataSuite) SetupTest() {
|
||||
s.mq = &msgstream.MockMsgStream{}
|
||||
|
||||
var err error
|
||||
s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
||||
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
||||
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
||||
return s.mq, nil
|
||||
},
|
||||
|
@ -155,7 +155,7 @@ func (s *DelegatorSuite) SetupTest() {
|
||||
|
||||
var err error
|
||||
// s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader)
|
||||
s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
||||
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
||||
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
||||
return s.mq, nil
|
||||
},
|
||||
@ -1046,7 +1046,7 @@ func TestDelegatorWatchTsafe(t *testing.T) {
|
||||
channelName := "default_dml_channel"
|
||||
|
||||
tsafeManager := tsafe.NewTSafeReplica()
|
||||
tsafeManager.Add(channelName, 100)
|
||||
tsafeManager.Add(context.Background(), channelName, 100)
|
||||
sd := &shardDelegator{
|
||||
tsafeManager: tsafeManager,
|
||||
vchannelName: channelName,
|
||||
@ -1073,7 +1073,7 @@ func TestDelegatorTSafeListenerClosed(t *testing.T) {
|
||||
channelName := "default_dml_channel"
|
||||
|
||||
tsafeManager := tsafe.NewTSafeReplica()
|
||||
tsafeManager.Add(channelName, 100)
|
||||
tsafeManager.Add(context.Background(), channelName, 100)
|
||||
sd := &shardDelegator{
|
||||
tsafeManager: tsafeManager,
|
||||
vchannelName: channelName,
|
||||
@ -1098,7 +1098,7 @@ func TestDelegatorTSafeListenerClosed(t *testing.T) {
|
||||
case <-time.After(time.Millisecond * 10):
|
||||
}
|
||||
|
||||
tsafeManager.Remove(channelName)
|
||||
tsafeManager.Remove(context.Background(), channelName)
|
||||
|
||||
select {
|
||||
case <-signal:
|
||||
|
@ -17,6 +17,7 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/samber/lo"
|
||||
@ -93,7 +94,7 @@ func (suite *DeleteNodeSuite) TestBasic() {
|
||||
})
|
||||
// init dependency
|
||||
suite.tSafeManager = tsafe.NewTSafeReplica()
|
||||
suite.tSafeManager.Add(suite.channel, 0)
|
||||
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
|
||||
// build delete node and data
|
||||
node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.tSafeManager, suite.delegator, 8)
|
||||
in := suite.buildDeleteNodeMsg()
|
||||
|
@ -17,6 +17,7 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
@ -60,7 +61,7 @@ func (suite *PipelineManagerTestSuite) SetupTest() {
|
||||
// init dependency
|
||||
// init tsafeManager
|
||||
suite.tSafeManager = tsafe.NewTSafeReplica()
|
||||
suite.tSafeManager.Add(suite.channel, 0)
|
||||
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
|
||||
suite.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
|
||||
|
||||
// init mock
|
||||
|
@ -17,6 +17,7 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/samber/lo"
|
||||
@ -101,7 +102,7 @@ func (suite *PipelineTestSuite) SetupTest() {
|
||||
// init dependency
|
||||
// init tsafeManager
|
||||
suite.tSafeManager = tsafe.NewTSafeReplica()
|
||||
suite.tSafeManager.Add(suite.channel, 0)
|
||||
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
|
||||
}
|
||||
|
||||
func (suite *PipelineTestSuite) TestBasic() {
|
||||
|
@ -95,12 +95,14 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context, req *internalpb
|
||||
|
||||
// GetStatistics returns loaded statistics of collection.
|
||||
func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
|
||||
log.Debug("received GetStatisticsRequest",
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
||||
zap.Strings("vChannels", req.GetDmlChannels()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
||||
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
||||
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()),
|
||||
)
|
||||
log.Debug("received GetStatisticsRequest")
|
||||
|
||||
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
|
||||
return &internalpb.GetStatisticsResponse{
|
||||
@ -246,7 +248,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||
node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
|
||||
collection := node.manager.Collection.Get(req.GetCollectionID())
|
||||
collection.SetMetricType(req.GetLoadMeta().GetMetricType())
|
||||
delegator, err := delegator.NewShardDelegator(req.GetCollectionID(), req.GetReplicaID(), channel.GetChannelName(), req.GetVersion(),
|
||||
delegator, err := delegator.NewShardDelegator(ctx, req.GetCollectionID(), req.GetReplicaID(), channel.GetChannelName(), req.GetVersion(),
|
||||
node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp())
|
||||
if err != nil {
|
||||
log.Warn("failed to create shard delegator", zap.Error(err))
|
||||
@ -260,10 +262,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||
}()
|
||||
|
||||
// create tSafe
|
||||
node.tSafeManager.Add(channel.ChannelName, channel.GetSeekPosition().GetTimestamp())
|
||||
node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp())
|
||||
defer func() {
|
||||
if err != nil {
|
||||
node.tSafeManager.Remove(channel.ChannelName)
|
||||
node.tSafeManager.Remove(ctx, channel.ChannelName)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -361,7 +363,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
||||
|
||||
node.pipelineManager.Remove(req.GetChannelName())
|
||||
node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
|
||||
node.tSafeManager.Remove(req.GetChannelName())
|
||||
node.tSafeManager.Remove(ctx, req.GetChannelName())
|
||||
|
||||
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
||||
}
|
||||
@ -1194,7 +1196,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
|
||||
}
|
||||
|
||||
func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
|
||||
log := log.With(
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("msgID", req.GetBase().GetMsgID()),
|
||||
zap.Int64("nodeID", paramtable.GetNodeID()),
|
||||
)
|
||||
|
@ -17,6 +17,7 @@
|
||||
package tsafe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@ -31,8 +32,8 @@ import (
|
||||
type Manager interface {
|
||||
Get(vChannel string) (Timestamp, error)
|
||||
Set(vChannel string, timestamp Timestamp) error
|
||||
Add(vChannel string, timestamp Timestamp)
|
||||
Remove(vChannel string)
|
||||
Add(ctx context.Context, vChannel string, timestamp Timestamp)
|
||||
Remove(ctx context.Context, vChannel string)
|
||||
Watch() Listener
|
||||
WatchChannel(channel string) Listener
|
||||
|
||||
@ -58,15 +59,15 @@ func (t *tSafeManager) WatchChannel(channel string) Listener {
|
||||
return l
|
||||
}
|
||||
|
||||
func (t *tSafeManager) Add(vChannel string, timestamp uint64) {
|
||||
func (t *tSafeManager) Add(ctx context.Context, vChannel string, timestamp uint64) {
|
||||
ts, _ := tsoutil.ParseTS(timestamp)
|
||||
log.Info("add tSafe done",
|
||||
zap.String("channel", vChannel), zap.Time("timestamp", ts))
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if _, ok := t.tSafes[vChannel]; !ok {
|
||||
t.tSafes[vChannel] = newTSafe(vChannel, timestamp)
|
||||
}
|
||||
log.Ctx(ctx).Info("add tSafe done",
|
||||
zap.String("channel", vChannel), zap.Time("timestamp", ts))
|
||||
}
|
||||
|
||||
func (t *tSafeManager) Get(vChannel string) (Timestamp, error) {
|
||||
@ -91,9 +92,7 @@ func (t *tSafeManager) Set(vChannel string, timestamp Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tSafeManager) Remove(vChannel string) {
|
||||
log.Info("remove tSafe replica",
|
||||
zap.String("vChannel", vChannel))
|
||||
func (t *tSafeManager) Remove(ctx context.Context, vChannel string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
tsafe, ok := t.tSafes[vChannel]
|
||||
@ -105,6 +104,8 @@ func (t *tSafeManager) Remove(vChannel string) {
|
||||
}
|
||||
delete(t.tSafes, vChannel)
|
||||
delete(t.listeners, vChannel)
|
||||
log.Ctx(ctx).Info("remove tSafe replica",
|
||||
zap.String("vChannel", vChannel))
|
||||
}
|
||||
|
||||
func (t *tSafeManager) Min() (string, Timestamp) {
|
||||
|
@ -17,6 +17,7 @@
|
||||
package tsafe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -43,7 +44,7 @@ func (suite *TSafeTestSuite) SetupTest() {
|
||||
|
||||
// test Basic use of TSafeReplica
|
||||
func (suite *TSafeTestSuite) TestBasic() {
|
||||
suite.tSafeReplica.Add(suite.channel, ZeroTimestamp)
|
||||
suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp)
|
||||
t, err := suite.tSafeReplica.Get(suite.channel)
|
||||
suite.NoError(err)
|
||||
suite.Equal(ZeroTimestamp, t)
|
||||
@ -75,12 +76,12 @@ func (suite *TSafeTestSuite) TestBasic() {
|
||||
}
|
||||
|
||||
func (suite *TSafeTestSuite) TestRemoveAndInvalid() {
|
||||
suite.tSafeReplica.Add(suite.channel, ZeroTimestamp)
|
||||
suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp)
|
||||
t, err := suite.tSafeReplica.Get(suite.channel)
|
||||
suite.NoError(err)
|
||||
suite.Equal(ZeroTimestamp, t)
|
||||
|
||||
suite.tSafeReplica.Remove(suite.channel)
|
||||
suite.tSafeReplica.Remove(context.Background(), suite.channel)
|
||||
_, err = suite.tSafeReplica.Get(suite.channel)
|
||||
suite.Error(err)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user