milvus/cmd/storage/benchmark.go
cai.zhang 5bcabffdaf Replace conf struct with ParamsTable
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
2020-11-16 21:10:43 +08:00

316 lines
9.7 KiB
Go

package main
import (
"context"
"crypto/md5"
"flag"
"fmt"
"log"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"
"github.com/pivotal-golang/bytefmt"
"github.com/zilliztech/milvus-distributed/internal/storage"
storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type"
)
// Global variables
var durationSecs, threads, loops, numVersion, batchOpSize int
var valueSize uint64
var valueData []byte
var batchValueData [][]byte
var counter, totalKeyCount, keyNum int32
var endTime, setFinish, getFinish, deleteFinish time.Time
var totalKeys [][]byte
var logFileName = "benchmark.log"
var logFile *os.File
var store storagetype.Store
var wg sync.WaitGroup
func runSet() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&keyNum, 1)
key := []byte(fmt.Sprint("key", num))
for ver := 1; ver <= numVersion; ver++ {
atomic.AddInt32(&counter, 1)
err := store.PutRow(context.Background(), key, valueData, "empty", uint64(ver))
if err != nil {
log.Fatalf("Error setting key %s, %s", key, err.Error())
//atomic.AddInt32(&setCount, -1)
}
}
}
// Remember last done time
setFinish = time.Now()
wg.Done()
}
func runBatchSet() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&keyNum, int32(batchOpSize))
keys := make([][]byte, batchOpSize)
versions := make([]uint64, batchOpSize)
batchSuffix := make([]string, batchOpSize)
for n := batchOpSize; n > 0; n-- {
keys[n-1] = []byte(fmt.Sprint("key", num-int32(n)))
}
for ver := 1; ver <= numVersion; ver++ {
atomic.AddInt32(&counter, 1)
err := store.PutRows(context.Background(), keys, batchValueData, batchSuffix, versions)
if err != nil {
log.Fatalf("Error setting batch keys %s %s", keys, err.Error())
//atomic.AddInt32(&batchSetCount, -1)
}
}
}
setFinish = time.Now()
wg.Done()
}
func runGet() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&counter, 1)
//num := atomic.AddInt32(&keyNum, 1)
//key := []byte(fmt.Sprint("key", num))
num = num % totalKeyCount
key := totalKeys[num]
_, err := store.GetRow(context.Background(), key, uint64(numVersion))
if err != nil {
log.Fatalf("Error getting key %s, %s", key, err.Error())
//atomic.AddInt32(&getCount, -1)
}
}
// Remember last done time
getFinish = time.Now()
wg.Done()
}
func runBatchGet() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&keyNum, int32(batchOpSize))
//keys := make([][]byte, batchOpSize)
//for n := batchOpSize; n > 0; n-- {
// keys[n-1] = []byte(fmt.Sprint("key", num-int32(n)))
//}
end := num % totalKeyCount
if end < int32(batchOpSize) {
end = int32(batchOpSize)
}
start := end - int32(batchOpSize)
keys := totalKeys[start:end]
versions := make([]uint64, batchOpSize)
for i := range versions {
versions[i] = uint64(numVersion)
}
atomic.AddInt32(&counter, 1)
_, err := store.GetRows(context.Background(), keys, versions)
if err != nil {
log.Fatalf("Error getting key %s, %s", keys, err.Error())
//atomic.AddInt32(&batchGetCount, -1)
}
}
// Remember last done time
getFinish = time.Now()
wg.Done()
}
func runDelete() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&counter, 1)
//num := atomic.AddInt32(&keyNum, 1)
//key := []byte(fmt.Sprint("key", num))
num = num % totalKeyCount
key := totalKeys[num]
err := store.DeleteRow(context.Background(), key, uint64(numVersion))
if err != nil {
log.Fatalf("Error getting key %s, %s", key, err.Error())
//atomic.AddInt32(&deleteCount, -1)
}
}
// Remember last done time
deleteFinish = time.Now()
wg.Done()
}
func runBatchDelete() {
for time.Now().Before(endTime) {
num := atomic.AddInt32(&keyNum, int32(batchOpSize))
//keys := make([][]byte, batchOpSize)
//for n := batchOpSize; n > 0; n-- {
// keys[n-1] = []byte(fmt.Sprint("key", num-int32(n)))
//}
end := num % totalKeyCount
if end < int32(batchOpSize) {
end = int32(batchOpSize)
}
start := end - int32(batchOpSize)
keys := totalKeys[start:end]
atomic.AddInt32(&counter, 1)
versions := make([]uint64, batchOpSize)
for i := range versions {
versions[i] = uint64(numVersion)
}
err := store.DeleteRows(context.Background(), keys, versions)
if err != nil {
log.Fatalf("Error getting key %s, %s", keys, err.Error())
//atomic.AddInt32(&batchDeleteCount, -1)
}
}
// Remember last done time
getFinish = time.Now()
wg.Done()
}
func main() {
// Parse command line
myflag := flag.NewFlagSet("myflag", flag.ExitOnError)
myflag.IntVar(&durationSecs, "d", 5, "Duration of each test in seconds")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
var sizeArg string
var storeType string
myflag.StringVar(&sizeArg, "z", "1k", "Size of objects in bytes with postfix K, M, and G")
myflag.StringVar(&storeType, "s", "s3", "Storage type, tikv or minio or s3")
myflag.IntVar(&numVersion, "v", 1, "Max versions for each key")
myflag.IntVar(&batchOpSize, "b", 100, "Batch operation kv pair number")
if err := myflag.Parse(os.Args[1:]); err != nil {
os.Exit(1)
}
// Check the arguments
var err error
if valueSize, err = bytefmt.ToBytes(sizeArg); err != nil {
log.Fatalf("Invalid -z argument for object size: %v", err)
}
var option = storagetype.Option{TikvAddress: "localhost:2379", Type: storeType, BucketName: "zilliz-hz"}
store, err = storage.NewStore(context.Background(), option)
if err != nil {
log.Fatalf("Error when creating storage " + err.Error())
}
logFile, err = os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
log.Fatalf("Prepare log file error, " + err.Error())
}
// Echo the parameters
log.Printf("Benchmark log will write to file %s\n", logFile.Name())
fmt.Fprintf(logFile, "Parameters: duration=%d, threads=%d, loops=%d, valueSize=%s, batchSize=%d, versions=%d\n", durationSecs, threads, loops, sizeArg, batchOpSize, numVersion)
// Init test data
valueData = make([]byte, valueSize)
rand.Read(valueData)
hasher := md5.New()
hasher.Write(valueData)
batchValueData = make([][]byte, batchOpSize)
for i := range batchValueData {
batchValueData[i] = make([]byte, valueSize)
rand.Read(batchValueData[i])
hasher := md5.New()
hasher.Write(batchValueData[i])
}
// Loop running the tests
for loop := 1; loop <= loops; loop++ {
// reset counters
counter = 0
keyNum = 0
totalKeyCount = 0
totalKeys = nil
// Run the batchSet case
// key seq start from setCount
counter = 0
startTime := time.Now()
endTime = startTime.Add(time.Second * time.Duration(durationSecs))
for n := 1; n <= threads; n++ {
wg.Add(1)
go runBatchSet()
}
wg.Wait()
setTime := setFinish.Sub(startTime).Seconds()
bps := float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime
fmt.Fprintf(logFile, "Loop %d: BATCH PUT time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
loop, setTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter*int32(batchOpSize))/setTime)
// Record all test keys
//totalKeyCount = keyNum
//totalKeys = make([][]byte, totalKeyCount)
//for i := int32(0); i < totalKeyCount; i++ {
// totalKeys[i] = []byte(fmt.Sprint("key", i))
//}
//
//// Run the get case
//counter = 0
//startTime = time.Now()
//endTime = startTime.Add(time.Second * time.Duration(durationSecs))
//for n := 1; n <= threads; n++ {
// wg.Add(1)
// go runGet()
//}
//wg.Wait()
//
//getTime := getFinish.Sub(startTime).Seconds()
//bps = float64(uint64(counter)*valueSize) / getTime
//fmt.Fprint(logFile, fmt.Sprintf("Loop %d: GET time %.1f secs, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
// loop, getTime, counter, bytefmt.ByteSize(uint64(bps)), float64(counter)/getTime, float64(counter)/getTime))
// Run the batchGet case
//counter = 0
//startTime = time.Now()
//endTime = startTime.Add(time.Second * time.Duration(durationSecs))
//for n := 1; n <= threads; n++ {
// wg.Add(1)
// go runBatchGet()
//}
//wg.Wait()
//
//getTime = getFinish.Sub(startTime).Seconds()
//bps = float64(uint64(counter)*valueSize*uint64(batchOpSize)) / getTime
//fmt.Fprint(logFile, fmt.Sprintf("Loop %d: BATCH GET time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
// loop, getTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/getTime, float64(counter * int32(batchOpSize))/getTime))
//
//// Run the delete case
//counter = 0
//startTime = time.Now()
//endTime = startTime.Add(time.Second * time.Duration(durationSecs))
//for n := 1; n <= threads; n++ {
// wg.Add(1)
// go runDelete()
//}
//wg.Wait()
//
//deleteTime := deleteFinish.Sub(startTime).Seconds()
//bps = float64(uint64(counter)*valueSize) / deleteTime
//fmt.Fprint(logFile, fmt.Sprintf("Loop %d: Delete time %.1f secs, kv pairs = %d, %.1f operations/sec, %.1f kv/sec.\n",
// loop, deleteTime, counter, float64(counter)/deleteTime, float64(counter)/deleteTime))
//
//// Run the batchDelete case
//counter = 0
//startTime = time.Now()
//endTime = startTime.Add(time.Second * time.Duration(durationSecs))
//for n := 1; n <= threads; n++ {
// wg.Add(1)
// go runBatchDelete()
//}
//wg.Wait()
//
//deleteTime = setFinish.Sub(startTime).Seconds()
//bps = float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime
//fmt.Fprint(logFile, fmt.Sprintf("Loop %d: BATCH DELETE time %.1f secs, batchs = %d, kv pairs = %d, %.1f operations/sec, %.1f kv/sec.\n",
// loop, setTime, counter, counter*int32(batchOpSize), float64(counter)/setTime, float64(counter * int32(batchOpSize))/setTime))
// Print line mark
lineMark := "\n"
fmt.Fprint(logFile, lineMark)
}
log.Print("Benchmark test done.")
}