mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Avoid to copy during converting C bytes to Go bytes (#15551)
* Avoid to copy during converting C bytes to Go bytes Signed-off-by: yah01 <yah2er0ne@outlook.com> * Add BytesConverter to achieve no-copy converting C bytes to Go bytes Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
parent
4f62da2918
commit
684110bc9a
@ -49,6 +49,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/cgoconverter"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
@ -320,8 +321,9 @@ func HandleCProto(cRes *C.CProto, msg proto.Message) error {
|
||||
// Standalone CProto is protobuf created by C side,
|
||||
// Passed from c side
|
||||
// memory is managed manually
|
||||
blob := C.GoBytes(unsafe.Pointer(cRes.proto_blob), C.int32_t(cRes.proto_size))
|
||||
defer C.free(cRes.proto_blob)
|
||||
lease, blob := cgoconverter.UnsafeGoBytes(&cRes.proto_blob, int(cRes.proto_size))
|
||||
defer cgoconverter.Release(lease)
|
||||
|
||||
return proto.Unmarshal(blob, msg)
|
||||
}
|
||||
|
||||
|
93
internal/util/cgoconverter/bytes_converter.go
Normal file
93
internal/util/cgoconverter/bytes_converter.go
Normal file
@ -0,0 +1,93 @@
|
||||
package cgoconverter
|
||||
|
||||
/*
|
||||
#include <stdlib.h>
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
const maxByteArrayLen = math.MaxInt32
|
||||
|
||||
var globalConverter = NewBytesConverter()
|
||||
|
||||
type BytesConverter struct {
|
||||
pointers sync.Map // leaseId -> unsafe.Pointer
|
||||
nextLease int32
|
||||
}
|
||||
|
||||
func NewBytesConverter() *BytesConverter {
|
||||
return &BytesConverter{
|
||||
pointers: sync.Map{},
|
||||
nextLease: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (converter *BytesConverter) add(p unsafe.Pointer) int32 {
|
||||
lease := atomic.AddInt32(&converter.nextLease, 1)
|
||||
converter.pointers.Store(lease, p)
|
||||
|
||||
return lease
|
||||
}
|
||||
|
||||
// Return a lease and []byte from C bytes (typically char*)
|
||||
// which references the same memory of C bytes
|
||||
// Call Release(lease) after you don't need the returned []byte
|
||||
func (converter *BytesConverter) UnsafeGoBytes(cbytes *unsafe.Pointer, len int) (int32, []byte) {
|
||||
var (
|
||||
goBytes []byte = nil
|
||||
lease int32 = 0
|
||||
)
|
||||
|
||||
if len > maxByteArrayLen {
|
||||
// C.GoBytes takes the length as C.int,
|
||||
// which is always 32-bit (not depends on platform)
|
||||
panic("UnsafeGoBytes: out of length")
|
||||
}
|
||||
goBytes = (*[maxByteArrayLen]byte)(*cbytes)[:len:len]
|
||||
lease = converter.add(*cbytes)
|
||||
*cbytes = nil
|
||||
|
||||
return lease, goBytes
|
||||
}
|
||||
|
||||
func (converter *BytesConverter) Release(lease int32) {
|
||||
pI, ok := converter.pointers.LoadAndDelete(lease)
|
||||
if !ok {
|
||||
panic("try to release the resource that doesn't exist")
|
||||
}
|
||||
|
||||
p, ok := pI.(unsafe.Pointer)
|
||||
if !ok {
|
||||
panic("incorrect value type")
|
||||
}
|
||||
|
||||
C.free(p)
|
||||
}
|
||||
|
||||
// Make sure only the caller own the converter
|
||||
// or this would release someone's memory
|
||||
func (converter *BytesConverter) ReleaseAll() {
|
||||
converter.pointers.Range(func(key, value interface{}) bool {
|
||||
pointer := value.(unsafe.Pointer)
|
||||
|
||||
converter.pointers.Delete(key)
|
||||
C.free(pointer)
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func UnsafeGoBytes(cbytes *unsafe.Pointer, len int) (int32, []byte) {
|
||||
return globalConverter.UnsafeGoBytes(cbytes, len)
|
||||
}
|
||||
|
||||
func Release(lease int32) {
|
||||
globalConverter.Release(lease)
|
||||
}
|
||||
|
||||
// DO NOT provide ReleaseAll() method for global converter
|
90
internal/util/cgoconverter/bytes_converter_test.go
Normal file
90
internal/util/cgoconverter/bytes_converter_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
package cgoconverter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBytesConverter(t *testing.T) {
|
||||
data := []byte("bytes converter test\n")
|
||||
length := len(data)
|
||||
cbytes := copyToCBytes(data)
|
||||
|
||||
lease, goBytes := UnsafeGoBytes(&cbytes, length)
|
||||
defer Release(lease)
|
||||
equalBytes(t, data, goBytes)
|
||||
|
||||
// data = make([]byte, maxByteArrayLen)
|
||||
v := byte(0x57)
|
||||
length = maxByteArrayLen
|
||||
cbytes = mallocCBytes(v, maxByteArrayLen)
|
||||
|
||||
lease, goBytes = UnsafeGoBytes(&cbytes, length)
|
||||
defer Release(lease)
|
||||
|
||||
if !isAll(goBytes, v) {
|
||||
t.Errorf("incorrect value, all bytes should be %v", v)
|
||||
}
|
||||
|
||||
// equalBytes(t, data, goBytes)
|
||||
}
|
||||
|
||||
func TestConcurrentBytesConverter(t *testing.T) {
|
||||
concurrency := runtime.GOMAXPROCS(0)
|
||||
if concurrency <= 1 {
|
||||
concurrency = 4
|
||||
}
|
||||
|
||||
length := maxByteArrayLen / concurrency
|
||||
|
||||
errCh := make(chan error, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(iter int) {
|
||||
v := byte(iter)
|
||||
cbytes := mallocCBytes(v, length)
|
||||
|
||||
lease, goBytes := UnsafeGoBytes(&cbytes, length)
|
||||
defer Release(lease)
|
||||
|
||||
if !isAll(goBytes, v) {
|
||||
errCh <- fmt.Errorf("iter %d: incorrect value, all bytes should be %v", iter, v)
|
||||
} else {
|
||||
errCh <- nil
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
hasErr := false
|
||||
for i := 0; i < concurrency; i++ {
|
||||
err := <-errCh
|
||||
if err != nil {
|
||||
t.Logf("err=%+v", err)
|
||||
hasErr = true
|
||||
}
|
||||
}
|
||||
if hasErr {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func equalBytes(t *testing.T, origin []byte, new []byte) {
|
||||
if len(origin) != len(new) {
|
||||
t.Errorf("len(new)=%d new=%+v", len(new), new)
|
||||
}
|
||||
|
||||
if !bytes.Equal(origin, new) {
|
||||
t.Errorf("data is not consistent")
|
||||
}
|
||||
}
|
||||
|
||||
func isAll(data []byte, v byte) bool {
|
||||
for _, b := range data {
|
||||
if b != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
19
internal/util/cgoconverter/test_utils.go
Normal file
19
internal/util/cgoconverter/test_utils.go
Normal file
@ -0,0 +1,19 @@
|
||||
package cgoconverter
|
||||
|
||||
/*
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
*/
|
||||
import "C"
|
||||
import "unsafe"
|
||||
|
||||
func copyToCBytes(data []byte) unsafe.Pointer {
|
||||
return C.CBytes(data)
|
||||
}
|
||||
|
||||
func mallocCBytes(v byte, len int) unsafe.Pointer {
|
||||
p := C.malloc(C.ulong(len))
|
||||
C.memset(p, C.int(v), C.ulong(len))
|
||||
|
||||
return p
|
||||
}
|
Loading…
Reference in New Issue
Block a user