mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Add query mutex to fix crashed with panic (#10832)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
745bda478a
commit
6053d693df
@ -28,13 +28,12 @@ import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
@ -81,6 +80,12 @@ type ReplicaInterface interface {
|
||||
addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo)
|
||||
getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error)
|
||||
|
||||
// query mu
|
||||
queryLock()
|
||||
queryUnlock()
|
||||
queryRLock()
|
||||
queryRUnlock()
|
||||
|
||||
getSegmentsMemSize() int64
|
||||
freeAll()
|
||||
printReplica()
|
||||
@ -94,11 +99,28 @@ type collectionReplica struct {
|
||||
partitions map[UniqueID]*Partition
|
||||
segments map[UniqueID]*Segment
|
||||
|
||||
queryMu sync.RWMutex
|
||||
excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
|
||||
|
||||
etcdKV *etcdkv.EtcdKV
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) queryLock() {
|
||||
colReplica.queryMu.Lock()
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) queryUnlock() {
|
||||
colReplica.queryMu.Unlock()
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) queryRLock() {
|
||||
colReplica.queryMu.RLock()
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) queryRUnlock() {
|
||||
colReplica.queryMu.RUnlock()
|
||||
}
|
||||
|
||||
// getSegmentsMemSize get the memory size in bytes of all the Segments
|
||||
func (colReplica *collectionReplica) getSegmentsMemSize() int64 {
|
||||
colReplica.mu.RLock()
|
||||
@ -152,6 +174,8 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema
|
||||
// removeCollection removes the collection from collectionReplica
|
||||
func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error {
|
||||
colReplica.mu.Lock()
|
||||
colReplica.queryMu.Lock()
|
||||
defer colReplica.queryMu.Unlock()
|
||||
defer colReplica.mu.Unlock()
|
||||
return colReplica.removeCollectionPrivate(collectionID)
|
||||
}
|
||||
@ -300,6 +324,8 @@ func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID,
|
||||
// removePartition removes the partition from collectionReplica
|
||||
func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error {
|
||||
colReplica.mu.Lock()
|
||||
colReplica.queryMu.Lock()
|
||||
defer colReplica.queryMu.Unlock()
|
||||
defer colReplica.mu.Unlock()
|
||||
return colReplica.removePartitionPrivate(partitionID)
|
||||
}
|
||||
|
@ -410,9 +410,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
|
||||
}
|
||||
|
||||
serviceTime := q.getServiceableTime()
|
||||
gt, _ := tsoutil.ParseTS(guaranteeTs)
|
||||
st, _ := tsoutil.ParseTS(serviceTime)
|
||||
if guaranteeTs > serviceTime && len(collection.getVChannels()) > 0 {
|
||||
gt, _ := tsoutil.ParseTS(guaranteeTs)
|
||||
st, _ := tsoutil.ParseTS(serviceTime)
|
||||
log.Debug("query node::receiveQueryMsg: add to unsolvedMsg",
|
||||
zap.Any("collectionID", q.collectionID),
|
||||
zap.Any("sm.GuaranteeTimestamp", gt),
|
||||
@ -435,6 +435,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
|
||||
|
||||
log.Debug("doing query in receiveQueryMsg...",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("sm.GuaranteeTimestamp", gt),
|
||||
zap.Any("serviceTime", st),
|
||||
zap.Any("delta seconds", (guaranteeTs-serviceTime)/(1000*1000*1000)),
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
zap.String("msgType", msgTypeStr),
|
||||
)
|
||||
@ -850,6 +853,11 @@ func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][]
|
||||
// TODO:: cache map[dsl]plan
|
||||
// TODO: reBatched search requests
|
||||
func (q *queryCollection) search(msg queryMsg) error {
|
||||
q.streaming.replica.queryRLock()
|
||||
q.historical.replica.queryRLock()
|
||||
defer q.historical.replica.queryRUnlock()
|
||||
defer q.streaming.replica.queryRUnlock()
|
||||
|
||||
searchMsg := msg.(*msgstream.SearchMsg)
|
||||
sp, ctx := trace.StartSpanFromContext(searchMsg.TraceCtx())
|
||||
defer sp.Finish()
|
||||
|
@ -358,6 +358,10 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm
|
||||
return err
|
||||
}
|
||||
|
||||
node.streaming.replica.queryLock()
|
||||
node.historical.replica.queryLock()
|
||||
defer node.streaming.replica.queryUnlock()
|
||||
defer node.historical.replica.queryUnlock()
|
||||
for _, info := range segmentChangeInfos.Infos {
|
||||
// For online segments:
|
||||
for _, segmentInfo := range info.OnlineSegments {
|
||||
@ -366,6 +370,7 @@ func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegm
|
||||
if hasGrowingSegment {
|
||||
err := node.streaming.replica.removeSegment(segmentInfo.SegmentID)
|
||||
if err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
log.Debug("remove growing segment in adjustByChangeInfo",
|
||||
|
Loading…
Reference in New Issue
Block a user