Prevent consuming after loading collection

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-02-25 15:08:50 +08:00 committed by yefu.chen
parent 4c3486c770
commit 7cb28428ce
4 changed files with 46 additions and 13 deletions

View File

@ -128,7 +128,7 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string,
Topic: channels[i], Topic: channels[i],
SubscriptionName: subName, SubscriptionName: subName,
Type: pulsar.KeyShared, Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
MessageChannel: receiveChannel, MessageChannel: receiveChannel,
}) })
if err != nil { if err != nil {
@ -521,7 +521,7 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string,
Topic: channels[i], Topic: channels[i],
SubscriptionName: subName, SubscriptionName: subName,
Type: pulsar.KeyShared, Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
MessageChannel: receiveChannel, MessageChannel: receiveChannel,
}) })
if err != nil { if err != nil {

View File

@ -4,10 +4,8 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"strconv"
"sync" "sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
) )
type Node interface { type Node interface {
@ -115,14 +113,32 @@ func (nodeCtx *nodeCtx) collectInputMessages() {
// timeTick alignment check // timeTick alignment check
if len(nodeCtx.inputMessages) > 1 { if len(nodeCtx.inputMessages) > 1 {
time := (*nodeCtx.inputMessages[0]).TimeTick() t := (*nodeCtx.inputMessages[0]).TimeTick()
latestTime := t
for i := 1; i < len(nodeCtx.inputMessages); i++ { for i := 1; i < len(nodeCtx.inputMessages); i++ {
if time != (*nodeCtx.inputMessages[i]).TimeTick() { if t < (*nodeCtx.inputMessages[i]).TimeTick() {
err := errors.New("Fatal, misaligned time tick," + latestTime = (*nodeCtx.inputMessages[i]).TimeTick()
"t1=" + strconv.FormatUint(time, 10) + //err := errors.New("Fatal, misaligned time tick," +
", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) + // "t1=" + strconv.FormatUint(time, 10) +
", please restart pulsar") // ", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) +
panic(err) // ", please restart pulsar")
//panic(err)
}
}
// wait for time tick
for i := 0; i < len(nodeCtx.inputMessages); i++ {
for (*nodeCtx.inputMessages[i]).TimeTick() != latestTime {
channel := nodeCtx.inputChannels[i]
select {
case <-time.After(10 * time.Second):
panic("cannot find time tick in flow graph")
case msg, ok := <-channel:
if !ok {
log.Println("input channel closed")
return
}
nodeCtx.inputMessages[i] = msg
}
} }
} }
} }

View File

@ -181,6 +181,22 @@ def collection(request, connect):
assert connect.has_collection(collection_name) assert connect.has_collection(collection_name)
return collection_name return collection_name
@pytest.fixture(scope="function")
def collection_without_loading(request, connect):
ori_collection_name = getattr(request.module, "collection_id", "test")
collection_name = gen_unique_str(ori_collection_name)
try:
default_fields = gen_default_fields()
connect.create_collection(collection_name, default_fields)
except Exception as e:
pytest.exit(str(e))
def teardown():
if connect.has_collection(collection_name):
connect.drop_collection(collection_name, timeout=delete_timeout)
request.addfinalizer(teardown)
assert connect.has_collection(collection_name)
return collection_name
# customised id # customised id
@pytest.fixture(scope="function") @pytest.fixture(scope="function")

View File

@ -9,12 +9,13 @@ class TestLoadCollection:
The following cases are used to test `load_collection` function The following cases are used to test `load_collection` function
****************************************************************** ******************************************************************
""" """
def test_load_collection(self, connect, collection): def test_load_collection(self, connect, collection_without_loading):
''' '''
target: test load collection and wait for loading collection target: test load collection and wait for loading collection
method: insert then flush, when flushed, try load collection method: insert then flush, when flushed, try load collection
expected: no errors expected: no errors
''' '''
collection = collection_without_loading
ids = connect.insert(collection, default_entities) ids = connect.insert(collection, default_entities)
ids = connect.insert(collection, default_entity) ids = connect.insert(collection, default_entity)
connect.flush([collection]) connect.flush([collection])