diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 34b7084aba..c93cbf479a 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -128,7 +128,7 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string, Topic: channels[i], SubscriptionName: subName, Type: pulsar.KeyShared, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest, MessageChannel: receiveChannel, }) if err != nil { @@ -521,7 +521,7 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string, Topic: channels[i], SubscriptionName: subName, Type: pulsar.KeyShared, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest, MessageChannel: receiveChannel, }) if err != nil { diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index a3750461e5..c12da812fd 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -4,10 +4,8 @@ import ( "context" "fmt" "log" - "strconv" "sync" - - "github.com/zilliztech/milvus-distributed/internal/errors" + "time" ) type Node interface { @@ -115,14 +113,32 @@ func (nodeCtx *nodeCtx) collectInputMessages() { // timeTick alignment check if len(nodeCtx.inputMessages) > 1 { - time := (*nodeCtx.inputMessages[0]).TimeTick() + t := (*nodeCtx.inputMessages[0]).TimeTick() + latestTime := t for i := 1; i < len(nodeCtx.inputMessages); i++ { - if time != (*nodeCtx.inputMessages[i]).TimeTick() { - err := errors.New("Fatal, misaligned time tick," + - "t1=" + strconv.FormatUint(time, 10) + - ", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) + - ", please restart pulsar") - panic(err) + if t < (*nodeCtx.inputMessages[i]).TimeTick() { + latestTime = (*nodeCtx.inputMessages[i]).TimeTick() + //err := errors.New("Fatal, misaligned time tick," + + // "t1=" + strconv.FormatUint(time, 10) + + // ", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) + + // ", please restart pulsar") + //panic(err) + } + } + // wait for time tick + for i := 0; i < len(nodeCtx.inputMessages); i++ { + for (*nodeCtx.inputMessages[i]).TimeTick() != latestTime { + channel := nodeCtx.inputChannels[i] + select { + case <-time.After(10 * time.Second): + panic("cannot find time tick in flow graph") + case msg, ok := <-channel: + if !ok { + log.Println("input channel closed") + return + } + nodeCtx.inputMessages[i] = msg + } } } } diff --git a/tests/python/conftest.py b/tests/python/conftest.py index 75328740dc..7f1fafb582 100644 --- a/tests/python/conftest.py +++ b/tests/python/conftest.py @@ -181,6 +181,22 @@ def collection(request, connect): assert connect.has_collection(collection_name) return collection_name +@pytest.fixture(scope="function") +def collection_without_loading(request, connect): + ori_collection_name = getattr(request.module, "collection_id", "test") + collection_name = gen_unique_str(ori_collection_name) + try: + default_fields = gen_default_fields() + connect.create_collection(collection_name, default_fields) + except Exception as e: + pytest.exit(str(e)) + def teardown(): + if connect.has_collection(collection_name): + connect.drop_collection(collection_name, timeout=delete_timeout) + request.addfinalizer(teardown) + assert connect.has_collection(collection_name) + return collection_name + # customised id @pytest.fixture(scope="function") diff --git a/tests/python/test_load_collection.py b/tests/python/test_load_collection.py index 2d5e638595..97f07ba054 100644 --- a/tests/python/test_load_collection.py +++ b/tests/python/test_load_collection.py @@ -9,12 +9,13 @@ class TestLoadCollection: The following cases are used to test `load_collection` function ****************************************************************** """ - def test_load_collection(self, connect, collection): + def test_load_collection(self, connect, collection_without_loading): ''' target: test load collection and wait for loading collection method: insert then flush, when flushed, try load collection expected: no errors ''' + collection = collection_without_loading ids = connect.insert(collection, default_entities) ids = connect.insert(collection, default_entity) connect.flush([collection])