mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
fix: Fix data race for clustering buffer writer (#35145)
issue: #34495 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
089fb2b5fb
commit
9412002d7d
@ -464,14 +464,6 @@ func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
|
||||
return totalBufferSize
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getCurrentBufferWrittenMemorySize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize())
|
||||
}
|
||||
return totalBufferSize
|
||||
}
|
||||
|
||||
// read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary
|
||||
func (t *clusteringCompactionTask) mappingSegment(
|
||||
ctx context.Context,
|
||||
@ -602,9 +594,12 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||
if (remained+1)%100 == 0 {
|
||||
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
|
||||
// trigger flushBinlog
|
||||
|
||||
t.clusterBufferLocks.RLock(clusterBuffer.id)
|
||||
currentBufferWriterFull := clusterBuffer.writer.IsFull()
|
||||
t.clusterBufferLocks.RUnlock(clusterBuffer.id)
|
||||
currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
|
||||
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() ||
|
||||
clusterBuffer.writer.IsFull() {
|
||||
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull {
|
||||
// reach segment/binlog max size
|
||||
t.clusterBufferLocks.Lock(clusterBuffer.id)
|
||||
writer := clusterBuffer.writer
|
||||
@ -754,12 +749,15 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
|
||||
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers")
|
||||
defer span.End()
|
||||
bufferIDs := make([]int, 0)
|
||||
bufferRowNums := make([]int64, 0)
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
bufferIDs = append(bufferIDs, buffer.id)
|
||||
t.clusterBufferLocks.RLock(buffer.id)
|
||||
bufferRowNums = append(bufferRowNums, buffer.writer.GetRowNum())
|
||||
t.clusterBufferLocks.RUnlock(buffer.id)
|
||||
}
|
||||
sort.Slice(bufferIDs, func(i, j int) bool {
|
||||
return t.clusterBuffers[i].writer.GetRowNum() >
|
||||
t.clusterBuffers[j].writer.GetRowNum()
|
||||
return bufferRowNums[i] > bufferRowNums[j]
|
||||
})
|
||||
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user