Fix panic while concurrent releasing payload writer/reader (#21218)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-12-14 16:23:22 +08:00 committed by GitHub
parent 6d30dc8cc3
commit cc56d58052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <mutex>
#include "storage/parquet_c.h" #include "storage/parquet_c.h"
#include "storage/PayloadReader.h" #include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h" #include "storage/PayloadWriter.h"
@ -23,6 +25,19 @@ using Payload = milvus::storage::Payload;
using PayloadWriter = milvus::storage::PayloadWriter; using PayloadWriter = milvus::storage::PayloadWriter;
using PayloadReader = milvus::storage::PayloadReader; using PayloadReader = milvus::storage::PayloadReader;
void
ReleaseArrowUnused() {
static std::mutex release_mutex;
// While multiple threads are releasing memory,
// we don't need everyone do releasing,
// just let some of them do this also works well
if (release_mutex.try_lock()) {
arrow::default_memory_pool()->ReleaseUnused();
release_mutex.unlock();
}
}
static const char* static const char*
ErrorMsg(const std::string& msg) { ErrorMsg(const std::string& msg) {
if (msg.empty()) if (msg.empty())
@ -174,9 +189,10 @@ GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
extern "C" void extern "C" void
ReleasePayloadWriter(CPayloadWriter handler) { ReleasePayloadWriter(CPayloadWriter handler) {
auto p = reinterpret_cast<PayloadWriter*>(handler); auto p = reinterpret_cast<PayloadWriter*>(handler);
if (p != nullptr) if (p != nullptr) {
delete p; delete p;
arrow::default_memory_pool()->ReleaseUnused(); ReleaseArrowUnused();
}
} }
extern "C" CPayloadReader extern "C" CPayloadReader
@ -350,5 +366,5 @@ extern "C" void
ReleasePayloadReader(CPayloadReader payloadReader) { ReleasePayloadReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<PayloadReader*>(payloadReader); auto p = reinterpret_cast<PayloadReader*>(payloadReader);
delete (p); delete (p);
arrow::default_memory_pool()->ReleaseUnused(); ReleaseArrowUnused();
} }