From 6053d693df4991a81049990441b465913eaf568a Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 29 Oct 2021 17:12:43 +0800 Subject: [PATCH] Add query mutex to fix crashed with panic (#10832) Signed-off-by: bigsheeper --- internal/querynode/collection_replica.go | 34 +++++++++++++++++++++--- internal/querynode/query_collection.go | 12 +++++++-- internal/querynode/query_node.go | 5 ++++ 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 2da2b080da..eec980bffd 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -28,13 +28,12 @@ import ( "strconv" "sync" - "github.com/milvus-io/milvus/internal/common" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/proto/datapb" - "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/common" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -81,6 +80,12 @@ type ReplicaInterface interface { addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error) + // query mu + queryLock() + queryUnlock() + queryRLock() + queryRUnlock() + getSegmentsMemSize() int64 freeAll() printReplica() @@ -94,11 +99,28 @@ type collectionReplica struct { partitions map[UniqueID]*Partition segments map[UniqueID]*Segment + queryMu sync.RWMutex excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs etcdKV *etcdkv.EtcdKV } +func (colReplica *collectionReplica) queryLock() { + colReplica.queryMu.Lock() +} + +func (colReplica *collectionReplica) queryUnlock() { + colReplica.queryMu.Unlock() +} + +func (colReplica *collectionReplica) queryRLock() { + colReplica.queryMu.RLock() +} + +func (colReplica *collectionReplica) queryRUnlock() { + colReplica.queryMu.RUnlock() +} + // getSegmentsMemSize get the memory size in bytes of all the Segments func (colReplica *collectionReplica) getSegmentsMemSize() int64 { colReplica.mu.RLock() @@ -152,6 +174,8 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema // removeCollection removes the collection from collectionReplica func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error { colReplica.mu.Lock() + colReplica.queryMu.Lock() + defer colReplica.queryMu.Unlock() defer colReplica.mu.Unlock() return colReplica.removeCollectionPrivate(collectionID) } @@ -300,6 +324,8 @@ func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, // removePartition removes the partition from collectionReplica func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error { colReplica.mu.Lock() + colReplica.queryMu.Lock() + defer colReplica.queryMu.Unlock() defer colReplica.mu.Unlock() return colReplica.removePartitionPrivate(partitionID) } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 432dcd9c36..14970451f6 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -410,9 +410,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { } serviceTime := q.getServiceableTime() + gt, _ := tsoutil.ParseTS(guaranteeTs) + st, _ := tsoutil.ParseTS(serviceTime) if guaranteeTs > serviceTime && len(collection.getVChannels()) > 0 { - gt, _ := tsoutil.ParseTS(guaranteeTs) - st, _ := tsoutil.ParseTS(serviceTime) log.Debug("query node::receiveQueryMsg: add to unsolvedMsg", zap.Any("collectionID", q.collectionID), zap.Any("sm.GuaranteeTimestamp", gt), @@ -435,6 +435,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { log.Debug("doing query in receiveQueryMsg...", zap.Int64("collectionID", collectionID), + zap.Any("sm.GuaranteeTimestamp", gt), + zap.Any("serviceTime", st), + zap.Any("delta seconds", (guaranteeTs-serviceTime)/(1000*1000*1000)), zap.Int64("msgID", msg.ID()), zap.String("msgType", msgTypeStr), ) @@ -850,6 +853,11 @@ func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][] // TODO:: cache map[dsl]plan // TODO: reBatched search requests func (q *queryCollection) search(msg queryMsg) error { + q.streaming.replica.queryRLock() + q.historical.replica.queryRLock() + defer q.historical.replica.queryRUnlock() + defer q.streaming.replica.queryRUnlock() + searchMsg := msg.(*msgstream.SearchMsg) sp, ctx := trace.StartSpanFromContext(searchMsg.TraceCtx()) defer sp.Finish() diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ae653ce3de..9c488af844 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -358,6 +358,10 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm return err } + node.streaming.replica.queryLock() + node.historical.replica.queryLock() + defer node.streaming.replica.queryUnlock() + defer node.historical.replica.queryUnlock() for _, info := range segmentChangeInfos.Infos { // For online segments: for _, segmentInfo := range info.OnlineSegments { @@ -366,6 +370,7 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm if hasGrowingSegment { err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) if err != nil { + return err } log.Debug("remove growing segment in adjustByChangeInfo",