mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Add execution time test
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
13ebbdfcca
commit
dc916ec10c
@ -48,7 +48,7 @@ Insert(CSegmentBase c_segment,
|
||||
auto res = segment->Insert(reserved_offset, size, primary_keys, timestamps, dataChunk);
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl;
|
||||
// std::cout << "do segment insert, sizeof_per_row = " << sizeof_per_row << std::endl;
|
||||
return res.code();
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ PreInsert(CSegmentBase c_segment, long int size) {
|
||||
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "PreInsert segment " << std::endl;
|
||||
// std::cout << "PreInsert segment " << std::endl;
|
||||
return segment->PreInsert(size);
|
||||
}
|
||||
|
||||
@ -81,7 +81,7 @@ PreDelete(CSegmentBase c_segment, long int size) {
|
||||
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
|
||||
|
||||
// TODO: delete print
|
||||
std::cout << "PreDelete segment " << std::endl;
|
||||
// std::cout << "PreDelete segment " << std::endl;
|
||||
return segment->PreDelete(size);
|
||||
}
|
||||
|
||||
|
@ -2,10 +2,12 @@ package message_client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
msgpb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MessageClient struct {
|
||||
@ -45,6 +47,8 @@ func (mc *MessageClient) GetSearchChan() chan *msgpb.SearchMsg {
|
||||
}
|
||||
|
||||
func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
|
||||
var count = 0
|
||||
var start time.Time
|
||||
for {
|
||||
insetOrDeleteMsg := msgpb.InsertOrDeleteMsg{}
|
||||
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
|
||||
@ -52,8 +56,16 @@ func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if count == 0 {
|
||||
start = time.Now()
|
||||
}
|
||||
count++
|
||||
mc.insertOrDeleteChan <- &insetOrDeleteMsg
|
||||
mc.insertOrDeleteConsumer.Ack(msg)
|
||||
if count == 100000 - 1 {
|
||||
elapsed := time.Since(start)
|
||||
fmt.Println("Query node ReceiveInsertOrDeleteMsg time:", elapsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,34 +165,47 @@ func (node *QueryNode) InitQueryNodeCollection() {
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
func (node *QueryNode) RunInsertDelete() {
|
||||
var count = 0
|
||||
var start time.Time
|
||||
for {
|
||||
time.Sleep(2 * 1000 * time.Millisecond)
|
||||
//time.Sleep(2 * 1000 * time.Millisecond)
|
||||
node.QueryNodeDataInit()
|
||||
// TODO: get timeRange from message client
|
||||
var timeRange = TimeRange{0, 0}
|
||||
var msgLen = node.PrepareBatchMsg()
|
||||
fmt.Println("PrepareBatchMsg Done, Insert len = ", msgLen[0])
|
||||
//fmt.Println("PrepareBatchMsg Done, Insert len = ", msgLen[0])
|
||||
if msgLen[0] == 0 {
|
||||
fmt.Println("0 msg found")
|
||||
//fmt.Println("0 msg found")
|
||||
continue
|
||||
}
|
||||
if count == 0 {
|
||||
start = time.Now()
|
||||
}
|
||||
count+=msgLen[0]
|
||||
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
|
||||
fmt.Println("MessagesPreprocess Done")
|
||||
//fmt.Println("MessagesPreprocess Done")
|
||||
node.WriterDelete()
|
||||
node.PreInsertAndDelete()
|
||||
fmt.Println("PreInsertAndDelete Done")
|
||||
//fmt.Println("PreInsertAndDelete Done")
|
||||
node.DoInsertAndDelete()
|
||||
fmt.Println("DoInsertAndDelete Done")
|
||||
//fmt.Println("DoInsertAndDelete Done")
|
||||
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
|
||||
fmt.Print("UpdateSearchTimeSync Done\n\n\n")
|
||||
//fmt.Print("UpdateSearchTimeSync Done\n\n\n")
|
||||
if count == 100000 - 1 {
|
||||
elapsed := time.Since(start)
|
||||
fmt.Println("Query node insert 10 × 10000 time:", elapsed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (node *QueryNode) RunSearch() {
|
||||
for {
|
||||
time.Sleep(2 * 1000 * time.Millisecond)
|
||||
//time.Sleep(2 * 1000 * time.Millisecond)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if len(node.messageClient.GetSearchChan()) <= 0 {
|
||||
fmt.Println("null Search")
|
||||
//fmt.Println("null Search")
|
||||
continue
|
||||
}
|
||||
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
|
||||
@ -200,6 +213,9 @@ func (node *QueryNode) RunSearch() {
|
||||
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
|
||||
fmt.Println("Do Search...")
|
||||
node.Search(node.messageClient.SearchMsg)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
fmt.Println("Query node search time:", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
@ -459,8 +475,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
||||
for _, res := range resultsTmp {
|
||||
results.Entities.Ids = append(results.Entities.Ids, res.ResultId)
|
||||
results.Distances = append(results.Distances, res.ResultDistance)
|
||||
results.Scores = append(results.Distances, float32(0))
|
||||
}
|
||||
|
||||
results.RowNum = int64(len(results.Distances))
|
||||
|
||||
// 3. publish result to pulsar
|
||||
node.PublishSearchResult(&results, clientId)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user