Add serialzation function

Signed-off-by: become-nice <995581097@qq.com>
This commit is contained in:
become-nice 2020-08-26 18:29:42 +08:00 committed by yefu.chen
parent fc6e31c8b0
commit 22a44c995d
7 changed files with 57 additions and 91 deletions

View File

@ -2,7 +2,8 @@ project(sulvim_core)
cmake_minimum_required(VERSION 3.16)
set( CMAKE_CXX_STANDARD 17 )
set( CMAKE_CXX_STANDARD_REQUIRED on )
set (CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include_directories(src)
add_subdirectory(src)
add_subdirectory(unittest)

View File

@ -1,43 +0,0 @@
########################### GTEST
# Enable ExternalProject CMake module
INCLUDE(ExternalProject)
# Set default ExternalProject root directory
SET_DIRECTORY_PROPERTIES(PROPERTIES EP_PREFIX ${CMAKE_BINARY_DIR}/third_party)
# Add gtest
# http://stackoverflow.com/questions/9689183/cmake-googletest
ExternalProject_Add(
googletest
URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip
# TIMEOUT 10
# # Force separate output paths for debug and release builds to allow easy
# # identification of correct lib in subsequent TARGET_LINK_LIBRARIES commands
# CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs
# -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_RELEASE:PATH=ReleaseLibs
# -Dgtest_force_shared_crt=ON
# Disable install step
INSTALL_COMMAND ""
# Wrap download, configure and build steps in a script to log output
LOG_DOWNLOAD ON
LOG_CONFIGURE ON
LOG_BUILD ON)
# Specify include dir
ExternalProject_Get_Property(googletest source_dir)
set(GTEST_INCLUDE_DIR ${source_dir}/include)
# Library
ExternalProject_Get_Property(googletest binary_dir)
# set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a)
# set(GTEST_LIBRARY gtest)
# add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED)
# set_property(TARGET ${GTEST_LIBRARY} PROPERTY IMPORTED_LOCATION
# ${GTEST_LIBRARY_PATH} )
# add_dependencies(${GTEST_LIBRARY} googletest)
set(GTEST_LIBRARY_PATH ${binary_dir}/lib)
add_library(gtest UNKNOWN IMPORTED)
add_library(gtest_main UNKNOWN IMPORTED)
set_property(TARGET gtest PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest.a)
set_property(TARGET gtest_main PROPERTY IMPORTED_LOCATION ${GTEST_LIBRARY_PATH}/libgtest_main.a)

View File

@ -7,4 +7,4 @@ add_library(milvus_dog_segment
${DOG_SEGMENT_FILES}
)
#add_dependencies( segment sqlite mysqlpp )
target_link_libraries(milvus_dog_segment tbb milvus_utils pthread)
target_link_libraries(milvus_dog_segment tbb milvus_utils)

View File

@ -1,15 +0,0 @@
enable_testing()
find_package(GTest REQUIRED)
set(MILVUS_TEST_FILES
test_dog_segment.cpp
)
add_executable(all_tests
${MILVUS_TEST_FILES}
)
target_link_libraries(all_tests
gtest
gtest_main
milvus_dog_segment
pthread
)

View File

@ -26,16 +26,14 @@
// #include "segment/SegmentWriter.h"
// #include "src/dog_segment/SegmentBase.h"
// #include "utils/Json.h"
#include <random>
#include <gtest/gtest.h>
#include "dog_segment/SegmentBase.h"
using std::cin;
using std::cout;
using std::endl;
// using SegmentVisitor = milvus::engine::SegmentVisitor;
using SegmentVisitor = milvus::engine::SegmentVisitor;
// namespace {
namespace {
// milvus::Status
// CreateCollection(std::shared_ptr<DB> db, const std::string& collection_name, const LSN_TYPE& lsn) {
// CreateCollectionContext context;
@ -75,7 +73,7 @@ using std::endl;
// }
// } // namespace
TEST(DogSegmentTest, TestABI) {
TEST_F(DogSegmentTest, TestABI) {
using namespace milvus::engine;
using namespace milvus::dog_segment;
ASSERT_EQ(TestABI(), 42);
@ -137,7 +135,7 @@ TEST(DogSegmentTest, TestABI) {
TEST(DogSegmentTest, MockTest) {
TEST_F(DogSegmentTest, MockTest) {
using namespace milvus::dog_segment;
using namespace milvus::engine;
auto schema = std::make_shared<Schema>();
@ -163,12 +161,11 @@ TEST(DogSegmentTest, MockTest) {
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
assert(raw_data.size() == line_sizeof * N);
auto segment = CreateSegment(schema).release();
auto segment = CreateSegment(schema);
DogDataChunk data_chunk{raw_data.data(), (int)line_sizeof, N};
segment->Insert(N, uids.data(), timestamps.data(), data_chunk);
QueryResult query_result;
segment->Query(nullptr, 0, query_result);
delete segment;
int i = 0;
i++;
}

View File

@ -1,6 +1,9 @@
package schema
import "bytes"
import (
"encoding/json"
"fmt"
)
type ErrorCode int32
@ -110,7 +113,7 @@ type InsertMsg struct {
type DeleteMsg struct {
CollectionName string
EntityId int64
Timestamp int64
Timestamp uint64
ClientId int64
MsgType OpType
}
@ -141,8 +144,11 @@ func (ims *InsertMsg) GetType() OpType {
}
func (ims *InsertMsg) Serialization() []byte {
var serialization_data bytes.Buffer
return serialization_data.Bytes()
data, err := json.Marshal(ims)
if err != nil {
fmt.Println("Can't serialization")
}
return data
}
func (ims *InsertMsg) Deserialization(serializationData []byte) {

View File

@ -24,10 +24,10 @@ type writeNode struct {
func NewWriteNode(ctx context.Context,
openSegmentId string,
timeSync uint64,
closeTime uint64,
nextSegmentId string,
nextCloseSegmentTime uint64) (*writeNode, error) {
nextCloseSegmentTime uint64,
timeSync uint64) (*writeNode, error) {
ctx = context.Background()
store, err := storage.NewStore(ctx, "TIKV")
writeTableTimeSync := &writeNodeTimeSync{deleteTimeSync: timeSync, insertTimeSync: timeSync}
@ -44,7 +44,7 @@ func NewWriteNode(ctx context.Context,
}, nil
}
func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, time_sync uint64) error {
func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg, timeSync uint64) error {
var i int
var storeKey string
@ -52,37 +52,57 @@ func (s *writeNode) InsertBatchData(ctx context.Context, data []schema.InsertMsg
var binaryData [][]byte
var timeStamps []uint64
for i = 0; i < cap(data); i++ {
for i = 0; i < len(data); i++ {
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keys = append(keys, []byte(storeKey))
binaryData = append(binaryData, data[i].Serialization())
timeStamps = append(timeStamps, data[i].Timestamp)
}
if s.segmentCloseTime <= time_sync {
if s.segmentCloseTime <= timeSync {
s.openSegmentId = s.nextSegmentId
s.segmentCloseTime = s.nextSegmentCloseTime
}
(*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps)
s.UpdateInsertTimeSync(time_sync)
err := (*s.kvStore).PutRows(ctx, keys, binaryData, s.openSegmentId, timeStamps)
s.UpdateInsertTimeSync(timeSync)
return err
}
func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, timeSync uint64) error {
var i int
var storeKey string
var keys [][]byte
var timeStamps []uint64
for i = 0; i < len(data); i++ {
storeKey = data[i].CollectionName + strconv.FormatInt(data[i].EntityId, 10)
keys = append(keys, []byte(storeKey))
timeStamps = append(timeStamps, data[i].Timestamp)
}
//TODO:Get segment id for delete data and deliver those message to specify topic
err := (*s.kvStore).DeleteRows(ctx, keys, timeStamps)
s.UpdateDeleteTimeSync(timeSync)
return err
}
func (s *writeNode) AddNewSegment(segmentId string, closeSegmentTime uint64) error {
s.nextSegmentId = segmentId
s.nextSegmentCloseTime = closeSegmentTime
return nil
}
func (s *writeNode) DeleteBatchData(ctx context.Context, data []schema.DeleteMsg, time_sync uint64) error {
return nil
func (s *writeNode) UpdateInsertTimeSync(timeSync uint64) {
s.timeSyncTable.insertTimeSync = timeSync
}
func (s *writeNode) AddNewSegment(segment_id string, close_segment_time uint64) error {
s.nextSegmentId = segment_id
s.nextSegmentCloseTime = close_segment_time
return nil
func (s *writeNode) UpdateDeleteTimeSync(timeSync uint64) {
s.timeSyncTable.deleteTimeSync = timeSync
}
func (s *writeNode) UpdateInsertTimeSync(time_sync uint64) {
s.timeSyncTable.insertTimeSync = time_sync
}
func (s *writeNode) UpdateDeleteTimeSync(time_sync uint64) {
s.timeSyncTable.deleteTimeSync = time_sync
func (s *writeNode) UpdateCloseTime(closeTime uint64) {
s.segmentCloseTime = closeTime
}