mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Fix storage memory leak caused by runtime.SetFinalizer (#15100)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
d719f1edb7
commit
4369e08f2a
@ -19,9 +19,6 @@ package storage
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"runtime"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
)
|
||||
|
||||
// BinlogReader is an object to read binlog file. Binlog file's format can be
|
||||
@ -91,10 +88,5 @@ func NewBinlogReader(data []byte) (*BinlogReader, error) {
|
||||
if _, err := reader.readDescriptorEvent(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runtime.SetFinalizer(reader, func(reader *BinlogReader) {
|
||||
if !reader.isClose {
|
||||
log.Error("binlog reader is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return reader, nil
|
||||
}
|
||||
|
@ -20,10 +20,8 @@ import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -276,11 +274,6 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
||||
},
|
||||
}
|
||||
|
||||
runtime.SetFinalizer(w, func(writer *InsertBinlogWriter) {
|
||||
if !w.isClosed() {
|
||||
log.Error("insert binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
@ -300,11 +293,6 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
||||
buffer: nil,
|
||||
},
|
||||
}
|
||||
runtime.SetFinalizer(w, func(writer *DeleteBinlogWriter) {
|
||||
if !w.isClosed() {
|
||||
log.Error("delete binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
@ -322,11 +310,6 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinl
|
||||
buffer: nil,
|
||||
},
|
||||
}
|
||||
runtime.SetFinalizer(w, func(writer *DDLBinlogWriter) {
|
||||
if !w.isClosed() {
|
||||
log.Error("ddl binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
@ -362,10 +345,5 @@ func NewIndexFileBinlogWriter(
|
||||
buffer: nil,
|
||||
},
|
||||
}
|
||||
runtime.SetFinalizer(w, func(writer *IndexFileBinlogWriter) {
|
||||
if !w.isClosed() {
|
||||
log.Error("index file binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
@ -19,9 +19,7 @@ package storage
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -110,10 +108,5 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea
|
||||
return nil, err
|
||||
}
|
||||
reader.PayloadReaderInterface = payloadReader
|
||||
runtime.SetFinalizer(reader, func(reader *EventReader) {
|
||||
if !reader.isClosed {
|
||||
log.Error("event reader is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return reader, nil
|
||||
}
|
||||
|
@ -22,10 +22,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
@ -259,11 +257,6 @@ func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *insertEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("insert event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -286,11 +279,6 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *deleteEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("delete event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -317,11 +305,6 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *createCollectionEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("create collection event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -348,11 +331,6 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *dropCollectionEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("drop collection event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -379,11 +357,6 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *createPartitionEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("create partition binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -410,11 +383,6 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *dropPartitionEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("drop partition event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -437,11 +405,6 @@ func newIndexFileEventWriter() (*indexFileEventWriter, error) {
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.indexFileEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.indexFileEventData.WriteEventData
|
||||
runtime.SetFinalizer(writer, func(writer *indexFileEventWriter) {
|
||||
if !writer.isClosed {
|
||||
log.Error("index file event binlog writer is leaking.. please check")
|
||||
}
|
||||
})
|
||||
|
||||
return writer, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user