Enable proxynode microbatch pulsar msg and enable parallel test

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2021-01-23 20:58:46 +08:00 committed by yefu.chen
parent 82d6fb18b8
commit aef4e41e91
17 changed files with 92 additions and 45 deletions

View File

@ -12,7 +12,7 @@ services:
- ../../..:/milvus-distributed:delegated
working_dir: "/milvus-distributed/tests/python"
command: >
/bin/bash -c "pytest --ip proxyservice"
/bin/bash -c "pytest --ip proxyservice -n 4"
networks:
- milvus

View File

@ -60,7 +60,9 @@ func (b *Builder) GetIndexStates(ctx context.Context, request *indexpb.IndexStat
var indexStates []*indexpb.IndexInfo
for _, indexID := range request.IndexID {
indexState, err := b.metaTable.GetIndexStates(indexID)
log.Println("GetIndexStates error, err=", err)
if err != nil {
log.Println("GetIndexStates error, err=", err)
}
indexStates = append(indexStates, indexState)
}
ret := &indexpb.IndexStatesResponse{

View File

@ -173,12 +173,11 @@ func (it *IndexBuildTask) Execute() error {
indexParams[key] = value
}
fmt.Println("before NewCIndex ..........................")
it.index, err = NewCIndex(typeParams, indexParams)
if err != nil {
fmt.Println("NewCIndex err:", err.Error())
return err
}
fmt.Println("after NewCIndex ..........................")
getKeyByPathNaive := func(path string) string {
// splitElements := strings.Split(path, "/")
@ -221,6 +220,7 @@ func (it *IndexBuildTask) Execute() error {
storageBlobs := getStorageBlobs(blobs)
var insertCodec storage.InsertCodec
partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs)
//fmt.Println("IndexBuilder for segmentID,", segmentID)
if err2 != nil {
return err2
}
@ -230,11 +230,11 @@ func (it *IndexBuildTask) Execute() error {
for _, value := range insertData.Data {
// TODO: BinaryVectorFieldData
fmt.Println("before build index ..................................")
floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData)
if fOk {
err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data)
if err != nil {
fmt.Println("BuildFloatVecIndexWithoutIds, error:", err.Error())
return err
}
}
@ -243,19 +243,19 @@ func (it *IndexBuildTask) Execute() error {
if bOk {
err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data)
if err != nil {
fmt.Println("BuildBinaryVecIndexWithoutIds, err:", err.Error())
return err
}
}
fmt.Println("after build index ..................................")
if !fOk && !bOk {
return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData")
}
fmt.Println("before serialize .............................................")
indexBlobs, err := it.index.Serialize()
fmt.Println("after serialize .............................................")
if err != nil {
fmt.Println("serialize ... err:", err.Error())
return err
}
@ -284,8 +284,8 @@ func (it *IndexBuildTask) Execute() error {
it.savePaths = append(it.savePaths, savePath)
}
}
return it.index.Delete()
it.index.Delete()
return nil
}
func (it *IndexBuildTask) PostExecute() error {

View File

@ -116,7 +116,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
func (queue *BaseTaskQueue) Enqueue(t task) error {
tID, _ := queue.sched.idAllocator.AllocOne()
log.Printf("[Builder] allocate reqID: %v", tID)
// log.Printf("[Builder] allocate reqID: %v", tID)
t.SetID(tID)
err := t.OnEnqueue()
if err != nil {
@ -209,17 +209,17 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
defer func() {
t.Notify(err)
log.Printf("notify with error: %v", err)
// log.Printf("notify with error: %v", err)
}()
if err != nil {
return
}
q.AddActiveTask(t)
log.Printf("task add to active list ...")
// log.Printf("task add to active list ...")
defer func() {
q.PopActiveTask(t.ID())
log.Printf("pop from active list ...")
// log.Printf("pop from active list ...")
}()
err = t.Execute()
@ -227,9 +227,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
log.Printf("execute definition task failed, error = %v", err)
return
}
log.Printf("task execution done ...")
// log.Printf("task execution done ...")
err = t.PostExecute()
log.Printf("post execute task done ...")
// log.Printf("post execute task done ...")
}
func (sched *TaskScheduler) indexBuildLoop() {

View File

@ -50,11 +50,11 @@ func (scheduler *FlushScheduler) schedule(id interface{}) error {
}
// todo set corrent timestamp
err = scheduler.client.FlushSegment(segmentID, segmentMeta.CollectionID, segmentMeta.PartitionTag, ts)
log.Printf("flush segment %d", segmentID)
if err != nil {
log.Println("flushsegment: ", segmentID, " error :", err.Error())
return err
}
//log.Printf("flush segment %d", segmentID)
scheduler.segmentDescribeChan <- segmentID
return nil
@ -78,6 +78,7 @@ func (scheduler *FlushScheduler) describe() error {
continue
}
if !description.IsClosed {
//log.Println("describe segment ", singleSegmentID, " IsClosed :False")
continue
}

View File

@ -2,6 +2,7 @@ package master
import (
"context"
"fmt"
"log"
"time"
@ -60,6 +61,7 @@ func (scheduler *IndexBuildScheduler) schedule(info interface{}) error {
return err
}
indexParams, err := scheduler.metaTable.GetFieldIndexParams(segMeta.CollectionID, indexBuildInfo.fieldID)
if err != nil {
return err
}
@ -73,8 +75,8 @@ func (scheduler *IndexBuildScheduler) schedule(info interface{}) error {
}
indexID, err := scheduler.client.BuildIndex(indexBuildInfo.binlogFilePath, typeParamsMap, indexParamsMap)
log.Printf("build index for segment %d field %d", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
if err != nil {
log.Printf("build index for segment %d field %d, failed:%s", indexBuildInfo.segmentID, indexBuildInfo.fieldID, err.Error())
return err
}
@ -151,6 +153,7 @@ func (scheduler *IndexBuildScheduler) describe() error {
IndexFilePaths: filePaths,
})
if err != nil {
fmt.Println("indexbuilder scheduler updateFiledIndexMetaFailed", indexBuildInfo.segmentID)
return err
}

View File

@ -195,6 +195,7 @@ func (task *getIndexStateTask) Execute() error {
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if int64(totalSegmentNums) == relatedSegments {
task.resp.State = commonpb.IndexState_FINISHED
} else {

View File

@ -499,7 +499,7 @@ func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.Descr
}
func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
log.Println("Describe index progress for: ", request)
// log.Println("Describe index progress for: ", request)
dipt := &GetIndexStateTask{
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,

View File

@ -121,7 +121,7 @@ func (node *NodeImpl) Init() error {
node.manipulationMsgStream.SetPulsarClient(pulsarAddress)
node.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames())
repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, false)
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
}
node.manipulationMsgStream.SetRepackFunc(repackFuncImpl)

View File

@ -3,7 +3,6 @@ package proxynode
import (
"context"
"errors"
"fmt"
"log"
"math"
"strconv"
@ -491,11 +490,11 @@ func (st *SearchTask) PostExecute() error {
for {
select {
case <-st.ctx.Done():
log.Print("wait to finish failed, timeout!")
log.Print("SearchTask: wait to finish failed, timeout!, taskID:", st.ID())
span.LogFields(oplog.String("wait to finish failed, timeout", "wait to finish failed, timeout"))
return errors.New("wait to finish failed, timeout")
return errors.New("SearchTask:wait to finish failed, timeout:" + strconv.FormatInt(st.ID(), 10))
case searchResults := <-st.resultBuf:
fmt.Println("searchResults: ", searchResults)
// fmt.Println("searchResults: ", searchResults)
span.LogFields(oplog.String("receive result", "receive result"))
filterSearchResult := make([]*internalpb2.SearchResults, 0)
var filterReason string

View File

@ -4,7 +4,9 @@ import (
"container/list"
"context"
"errors"
"fmt"
"log"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -170,11 +172,11 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
}
ts, _ := queue.sched.tsoAllocator.AllocOne()
log.Printf("[NodeImpl] allocate timestamp: %v", ts)
// log.Printf("[NodeImpl] allocate timestamp: %v", ts)
t.SetTs(ts)
reqID, _ := queue.sched.idAllocator.AllocOne()
log.Printf("[NodeImpl] allocate reqID: %v", reqID)
// log.Printf("[NodeImpl] allocate reqID: %v", reqID)
t.SetID(reqID)
return queue.addUnissuedTask(t)
@ -296,17 +298,17 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
defer func() {
t.Notify(err)
log.Printf("notify with error: %v", err)
// log.Printf("notify with error: %v", err)
}()
if err != nil {
return
}
q.AddActiveTask(t)
log.Printf("task add to active list ...")
// log.Printf("task add to active list ...")
defer func() {
q.PopActiveTask(t.EndTs())
log.Printf("pop from active list ...")
// log.Printf("pop from active list ...")
}()
err = t.Execute()
@ -314,9 +316,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
log.Printf("execute definition task failed, error = %v", err)
return
}
log.Printf("task execution done ...")
// log.Printf("task execution done ...")
err = t.PostExecute()
log.Printf("post execute task done ...")
// log.Printf("post execute task done ...")
}
func (sched *TaskScheduler) definitionLoop() {
@ -357,7 +359,7 @@ func (sched *TaskScheduler) queryLoop() {
case <-sched.ctx.Done():
return
case <-sched.DqQueue.utChan():
log.Print("scheduler receive query request ...")
// log.Print("scheduler receive query request ...")
if !sched.DqQueue.UTEmpty() {
t := sched.scheduleDqTask()
go sched.processTask(t, sched.DqQueue)
@ -398,11 +400,25 @@ func (sched *TaskScheduler) queryResultLoop() {
for _, tsMsg := range msgPack.Msgs {
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
reqID := searchResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
t := sched.getTaskByReqID(reqID)
if t == nil {
log.Println(fmt.Sprint("QueryResult:czs:GetTaskByReqID failed, reqID:", reqIDStr))
delete(queryResultBuf, reqID)
continue
}
_, ok = queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = make([]*internalpb2.SearchResults, 0)
}
queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResults)
//t := sched.getTaskByReqID(reqID)
{
colName := t.(*SearchTask).query.CollectionName
fmt.Println("ljq getCollection: ", colName, " reqID: ", reqIDStr, " answer cnt:", len(queryResultBuf[reqID]))
}
if len(queryResultBuf[reqID]) == queryNodeNum {
t := sched.getTaskByReqID(reqID)
if t != nil {
@ -413,7 +429,8 @@ func (sched *TaskScheduler) queryResultLoop() {
delete(queryResultBuf, reqID)
}
} else {
log.Printf("task with reqID %v is nil", reqID)
// log.Printf("task with reqID %v is nil", reqID)
}
}
}

View File

@ -4,7 +4,6 @@ import "C"
import (
"context"
"errors"
"fmt"
"log"
"regexp"
"strconv"
@ -240,9 +239,7 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
return errors.New("unmarshal query failed")
}
collectionName := query.CollectionName
fmt.Println("[ljq collection name]: ", collectionName)
partitionTagsInQuery := query.PartitionNames
fmt.Println("[search service ljq] query: ", query)
collection, err := ss.replica.getCollectionByName(collectionName)
if err != nil {
span.LogFields(oplog.Error(err))
@ -267,7 +264,7 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
searchResults := make([]*SearchResult, 0)
matchedSegments := make([]*Segment, 0)
fmt.Println("search msg's partitionTag = ", partitionTagsInQuery)
//fmt.Println("search msg's partitionTag = ", partitionTagsInQuery)
var partitionTagsInCol []string
for _, partition := range collection.partitions {
@ -411,7 +408,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
// fmt.Println(testHits.IDs)
// fmt.Println(testHits.Scores)
//}
err = ss.publishSearchResult(searchResultMsg)
if err != nil {
span.LogFields(oplog.Error(err))
@ -430,7 +426,6 @@ func (ss *searchService) publishSearchResult(msg msgstream.TsMsg) error {
// span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "publish search result")
// defer span.Finish()
// msg.SetMsgContext(ctx)
fmt.Println("Public SearchResult", msg.HashKeys())
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, msg)
err := ss.searchResultMsgStream.Produce(&msgPack)

View File

@ -95,7 +95,6 @@ func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error
if err != nil {
return nil, err
}
if count <= 0 {
ret.IsClosed = false
return ret, nil
@ -103,10 +102,11 @@ func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error
value, err := c.kvClient.Load(key)
if err != nil {
return ret, err
return nil, err
}
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return ret, err

View File

@ -63,6 +63,22 @@ func (fService *flushSyncService) completeInsertFlush(segID UniqueID) {
fService.insertFlushed[segID] = true
}
func (fService *flushSyncService) InsertFlushCompleted(segID UniqueID) bool {
isinsertFlushed, ok := fService.insertFlushed[segID]
if !ok {
return false
}
return isinsertFlushed
}
func (fService *flushSyncService) DDFlushCompleted(segID UniqueID) bool {
isddFlushed, ok := fService.ddFlushed[segID]
if !ok {
return false
}
return isddFlushed
}
func (fService *flushSyncService) FlushCompleted(segID UniqueID) bool {
isddFlushed, ok := fService.ddFlushed[segID]
if !ok {
@ -95,12 +111,18 @@ func (fService *flushSyncService) start() {
continue
}
fService.completeDDFlush(ddFlushMsg.segID)
if fService.FlushCompleted(ddFlushMsg.segID) {
//log.Printf("DD:Seg(%d) flush completed.", ddFlushMsg.segID)
fService.metaTable.CompleteFlush(Timestamp(0), ddFlushMsg.segID)
}
case insertFlushMsg := <-fService.insertChan:
if insertFlushMsg == nil {
continue
}
//log.Println("FlushSyncService insertFlushMsg ", insertFlushMsg.segID)
if !insertFlushMsg.flushCompleted {
//log.Println("FlushSyncService", insertFlushMsg.segID, " not flushCompleted")
err := fService.metaTable.AppendSegBinlogPaths(insertFlushMsg.ts, insertFlushMsg.segID, insertFlushMsg.fieldID,
insertFlushMsg.paths)
if err != nil {
@ -109,6 +131,7 @@ func (fService *flushSyncService) start() {
}
continue
}
fService.completeInsertFlush(insertFlushMsg.segID)
if fService.FlushCompleted(insertFlushMsg.segID) {

View File

@ -131,7 +131,6 @@ func (mt *metaTable) saveSegFlushMeta(meta *pb.SegmentFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.segID2FlushMeta[meta.SegmentID] = *meta
return mt.client.Save(Params.WriteNodeSegKvSubPath+strconv.FormatInt(meta.SegmentID, 10), value)
}

View File

@ -1,8 +1,13 @@
grpcio==1.26.0
grpcio-tools==1.26.0
numpy==1.18.1
pytest==5.3.4
pytest-cov==2.8.1
pytest-timeout==1.3.4
pymilvus-distributed==0.0.17
sklearn==0.0
pytest==4.5.0
pytest-timeout==1.3.3
pytest-repeat==0.8.0
allure-pytest==2.7.0
pytest-print==0.1.2
pytest-level==0.1.1
pytest-xdist==1.23.2

View File

@ -50,7 +50,9 @@ class TestListCollections:
collection_name = gen_unique_str(uid)
assert collection_name not in connect.list_collections()
@pytest.mark.level(2)
@pytest.mark.skip("can't run in parallel")
def test_list_collections_no_collection(self, connect):
'''
target: test show collections is correct or not, if no collection in db