Add config logic and tikv_benchmark.go

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2020-09-17 14:31:50 +08:00 committed by yefu.chen
parent 7f798c5fce
commit 3bb0da1f57
6 changed files with 273 additions and 6 deletions

View File

@ -0,0 +1,124 @@
// s3-benchmark.go
// Copyright (c) 2017 Wasabi Technology, Inc.
package main
import (
"github.com/czs007/suvlim/storage/pkg"
. "github.com/czs007/suvlim/storage/pkg/types"
"crypto/md5"
"flag"
"fmt"
"code.cloudfoundry.org/bytefmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"sync/atomic"
"time"
"context"
)
// Global variables
var duration_secs, threads int
var object_size uint64
var object_data []byte
var running_threads, upload_count, upload_slowdown_count int32
var endtime, upload_finish time.Time
func logit(msg string) {
fmt.Println(msg)
logfile, _ := os.OpenFile("benchmark.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if logfile != nil {
logfile.WriteString(time.Now().Format(http.TimeFormat) + ": " + msg + "\n")
logfile.Close()
}
}
func _putFile(ctx context.Context, store Store){
//objnum := atomic.AddInt32(&upload_count, 1)
key := "collection_abc"
err := store.PutRow(ctx, []byte(key), object_data, "abc", uint64(time.Now().Unix()))
if err != nil {
atomic.AddInt32(&upload_slowdown_count, 1)
}
}
func runPutFile(thread_num int) {
var store Store
var err error
ctx := context.Background()
store, err = storage.NewStore(ctx, TIKVDriver)
if err != nil {
panic(err.Error())
}
for time.Now().Before(endtime) {
_putFile(ctx, store)
}
// Remember last done time
upload_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
}
func main() {
// Hello
// Parse command line
myflag := flag.NewFlagSet("myflag", flag.ExitOnError)
myflag.IntVar(&duration_secs, "d", 1, "Duration of each test in seconds")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
var sizeArg string
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
if err := myflag.Parse(os.Args[1:]); err != nil {
os.Exit(1)
}
// Check the arguments
var err error
if object_size, err = bytefmt.ToBytes(sizeArg); err != nil {
log.Fatalf("Invalid -z argument for object size: %v", err)
}
logit(fmt.Sprintf("Parameters: duration=%d, threads=%d, size=%s",
duration_secs, threads, sizeArg))
// Initialize data for the bucket
object_data = make([]byte, object_size)
rand.Read(object_data)
hasher := md5.New()
hasher.Write(object_data)
// reset counters
upload_count = 0
upload_slowdown_count = 0
running_threads = int32(threads)
// Run the upload case
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runPutFile(n)
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
upload_time := upload_finish.Sub(starttime).Seconds()
bps := float64(uint64(upload_count)*object_size) / upload_time
logit(fmt.Sprintf("PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
fmt.Println(" upload_count :", upload_count)
}

View File

@ -152,6 +152,64 @@ TEST(CApiTest, SearchTest) {
}
TEST(CApiTest, SearchSimpleTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 3;
int DIM = 16;
std::vector<float> vec(DIM);
for (int i = 0; i < DIM; i++) {
vec[i] = i;
}
for (int i = 0; i < N; i++) {
uids.push_back(i);
timestamps.emplace_back(i);
// append vec
raw_data.insert(raw_data.end(), (const char *) &vec, ((const char *) &vec) + sizeof(float) * vec.size());
int age = i;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * DIM);
auto offset = PreInsert(segment, N);
auto ins_res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(ins_res == 0);
Close(segment);
BuildIndex(segment);
long result_ids[10];
float result_distances[10];
auto query_json = std::string(R"({"field_name":"fakevec","num_queries":1,"topK":10})");
std::vector<float> query_raw_data(DIM);
for (int i = 0; i < DIM; i++) {
query_raw_data[i] = i;
}
auto sea_res = Search(segment, query_json.data(), 1, query_raw_data.data(), DIM, result_ids, result_distances);
assert(sea_res == 0);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
TEST(CApiTest, IsOpenedTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";

74
reader/index_test.go Normal file
View File

@ -0,0 +1,74 @@
package reader
import (
"encoding/binary"
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"math"
"testing"
)
func TestIndex_BuildIndex(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids := make([]int64, 0)
timestamps := make([]uint64, 0)
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 16
const N = 10000
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, 1)
rawData = append(rawData, bs...)
var records [][]byte
for i := 0; i < N; i++ {
ids = append(ids, int64(i))
timestamps = append(timestamps, uint64(i))
records = append(records, rawData)
}
// 4. Do PreInsert
var offset = segment.SegmentPreInsert(N)
assert.GreaterOrEqual(t, offset, int64(0))
// 5. Do Insert
var err = segment.SegmentInsert(offset, &ids, &timestamps, &records)
assert.NoError(t, err)
// 6. Close segment, and build index
err = segment.Close()
assert.NoError(t, err)
// 7. Do search
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
var queryRawData = make([]float32, 0)
for i := 0; i < 16; i++ {
queryRawData = append(queryRawData, float32(i))
}
var vectorRecord = msgPb.VectorRowRecord{
FloatData: queryRawData,
}
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[N/2], &vectorRecord)
assert.NoError(t, searchErr)
fmt.Println(searchRes)
// 8. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}

View File

@ -383,6 +383,9 @@ func (node *QueryNode) DoInsertAndDelete() msgPb.Status {
// Do delete
for segmentID, deleteIDs := range node.deleteData.deleteIDs {
if segmentID < 0 {
continue
}
wg.Add(1)
var deleteTimestamps = node.deleteData.deleteTimestamps[segmentID]
fmt.Println("Doing delete......")

View File

@ -100,14 +100,14 @@ func TestSegment_SegmentSearch(t *testing.T) {
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids := []int64{1, 2, 3}
timestamps := []uint64{0, 0, 0}
ids := make([]int64, 0)
timestamps := make([]uint64, 0)
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 16
const N = 3
const N = 100
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
@ -120,6 +120,8 @@ func TestSegment_SegmentSearch(t *testing.T) {
rawData = append(rawData, bs...)
var records [][]byte
for i := 0; i < N; i++ {
ids = append(ids, int64(i))
timestamps = append(timestamps, uint64(i + 1))
records = append(records, rawData)
}
@ -140,7 +142,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
var vectorRecord = msgPb.VectorRowRecord {
FloatData: queryRawData,
}
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[0], &vectorRecord)
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[N/2], &vectorRecord)
assert.NoError(t, searchErr)
fmt.Println(searchRes)

View File

@ -13,10 +13,16 @@ func (node *QueryNode) GetKey2Segments() (*[]int64, *[]uint64, *[]int64) {
var key2SegMsg = node.messageClient.Key2SegMsg
for _, msg := range key2SegMsg {
for _, segmentID := range msg.SegmentId {
if msg.SegmentId == nil {
segmentIDs = append(segmentIDs, -1)
entityIDs = append(entityIDs, msg.Uid)
timestamps = append(timestamps, msg.Timestamp)
segmentIDs = append(segmentIDs, segmentID)
} else {
for _, segmentID := range msg.SegmentId {
segmentIDs = append(segmentIDs, segmentID)
entityIDs = append(entityIDs, msg.Uid)
timestamps = append(timestamps, msg.Timestamp)
}
}
}