Merge remote-tracking branch 'main/master' into AllenYu1987-patch-1

This commit is contained in:
JinHai-CN 2020-04-23 14:08:28 +08:00
commit a41949f075
18 changed files with 70 additions and 255 deletions

View File

@ -2289,12 +2289,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status; return status;
} }
std::set<std::string> flushed_tables; std::set<std::string> flushed_collections;
status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids, status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(float)), (record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size, (const float*)record.data, record.attr_nbytes, record.attr_data_size,
record.attr_data, record.lsn, flushed_tables); record.attr_data, record.lsn, flushed_collections);
collections_flushed(flushed_tables); collections_flushed(flushed_collections);
milvus::server::CollectInsertMetrics metrics(record.length, status); milvus::server::CollectInsertMetrics metrics(record.length, status);
break; break;

View File

@ -11,8 +11,8 @@
#include "scheduler/JobMgr.h" #include "scheduler/JobMgr.h"
#include <src/db/Utils.h> #include "src/db/Utils.h"
#include <src/segment/SegmentReader.h> #include "src/segment/SegmentReader.h"
#include <limits> #include <limits>
#include <utility> #include <utility>

View File

@ -62,14 +62,14 @@ SearchRequest::OnPreExecute() {
// step 1: check collection name // step 1: check collection name
auto status = ValidationUtil::ValidateCollectionName(collection_name_); auto status = ValidationUtil::ValidateCollectionName(collection_name_);
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str());
return status; return status;
} }
// step 2: check search topk // step 2: check search topk
status = ValidationUtil::ValidateSearchTopk(topk_); status = ValidationUtil::ValidateSearchTopk(topk_);
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str());
return status; return status;
} }
@ -77,7 +77,7 @@ SearchRequest::OnPreExecute() {
status = ValidationUtil::ValidatePartitionTags(partition_list_); status = ValidationUtil::ValidatePartitionTags(partition_list_);
fiu_do_on("SearchRequest.OnExecute.invalid_partition_tags", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("SearchRequest.OnExecute.invalid_partition_tags", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0, status.message().c_str());
return status; return status;
} }
@ -92,7 +92,7 @@ SearchRequest::OnExecute() {
fiu_do_on("SearchRequest.OnExecute.throw_std_exception", throw std::exception()); fiu_do_on("SearchRequest.OnExecute.throw_std_exception", throw std::exception());
std::string hdr = "SearchRequest execute(collection=" + collection_name_ + std::string hdr = "SearchRequest execute(collection=" + collection_name_ +
", nq=" + std::to_string(vector_count) + ", k=" + std::to_string(topk_) + ")"; ", nq=" + std::to_string(vector_count) + ", k=" + std::to_string(topk_) + ")";
TimeRecorderAuto rc(LogOut("[%s][%d] %s", "search", 0, hdr.c_str())); TimeRecorderAuto rc(LogOut("[%s][%ld] %s", "search", 0, hdr.c_str()));
// step 4: check collection existence // step 4: check collection existence
// only process root collection, ignore partition collection // only process root collection, ignore partition collection
@ -103,17 +103,17 @@ SearchRequest::OnExecute() {
status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) { if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) { if (status.code() == DB_NOT_FOUND) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Collection %s not found: %s", "search", 0, LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Collection %s not found: %s", "search", 0,
collection_name_.c_str(), status.message().c_str()); collection_name_.c_str(), status.message().c_str());
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else { } else {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Error occurred when describing collection %s: %s", "search", 0, LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Error occurred when describing collection %s: %s", "search", 0,
collection_name_.c_str(), status.message().c_str()); collection_name_.c_str(), status.message().c_str());
return status; return status;
} }
} else { } else {
if (!collection_schema_.owner_collection_.empty()) { if (!collection_schema_.owner_collection_.empty()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] %s", "search", 0, LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "search", 0,
CollectionNotExistMsg(collection_name_).c_str()); CollectionNotExistMsg(collection_name_).c_str());
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
} }
@ -122,14 +122,14 @@ SearchRequest::OnExecute() {
// step 5: check search parameters // step 5: check search parameters
status = ValidationUtil::ValidateSearchParams(extra_params_, collection_schema_, topk_); status = ValidationUtil::ValidateSearchParams(extra_params_, collection_schema_, topk_);
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid search params: %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Invalid search params: %s", "search", 0, status.message().c_str());
return status; return status;
} }
// step 6: check vector data according to metric type // step 6: check vector data according to metric type
status = ValidationUtil::ValidateVectorData(vectors_data_, collection_schema_); status = ValidationUtil::ValidateVectorData(vectors_data_, collection_schema_);
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Invalid vector data: %s", "search", 0, status.message().c_str());
return status; return status;
} }
@ -159,7 +159,7 @@ SearchRequest::OnExecute() {
#endif #endif
fiu_do_on("SearchRequest.OnExecute.query_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("SearchRequest.OnExecute.query_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) { if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Query fail: %s", "search", 0, status.message().c_str()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Query fail: %s", "search", 0, status.message().c_str());
return status; return status;
} }
fiu_do_on("SearchRequest.OnExecute.empty_result_ids", result_ids.clear()); fiu_do_on("SearchRequest.OnExecute.empty_result_ids", result_ids.clear());
@ -174,7 +174,7 @@ SearchRequest::OnExecute() {
result_.distance_list_.swap(result_distances); result_.distance_list_.swap(result_distances);
rc.RecordSection("construct result"); rc.RecordSection("construct result");
} catch (std::exception& ex) { } catch (std::exception& ex) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Encounter exception: %s", "search", 0, ex.what()); LOG_SERVER_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, ex.what());
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }

View File

@ -23,7 +23,6 @@
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include "server/web_impl/Constants.h" #include "server/web_impl/Constants.h"
#include "server/web_impl/dto/CmdDto.hpp"
#include "server/web_impl/dto/ConfigDto.hpp" #include "server/web_impl/dto/ConfigDto.hpp"
#include "server/web_impl/dto/IndexDto.hpp" #include "server/web_impl/dto/IndexDto.hpp"
#include "server/web_impl/dto/PartitionDto.hpp" #include "server/web_impl/dto/PartitionDto.hpp"

View File

@ -1,33 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "server/web_impl/dto/Dto.h"
namespace milvus {
namespace server {
namespace web {
#include OATPP_CODEGEN_BEGIN(DTO)
class CommandDto: public oatpp::data::mapping::type::Object {
DTO_INIT(CommandDto, Object)
DTO_FIELD(String, reply, "reply");
};
#include OATPP_CODEGEN_END(DTO)
} // namespace web
} // namespace server
} // namespace milvus

View File

@ -26,7 +26,6 @@
#include "server/context/Context.h" #include "server/context/Context.h"
#include "server/delivery/RequestHandler.h" #include "server/delivery/RequestHandler.h"
#include "server/web_impl/Types.h" #include "server/web_impl/Types.h"
#include "server/web_impl/dto/CmdDto.hpp"
#include "server/web_impl/dto/ConfigDto.hpp" #include "server/web_impl/dto/ConfigDto.hpp"
#include "server/web_impl/dto/DevicesDto.hpp" #include "server/web_impl/dto/DevicesDto.hpp"
#include "server/web_impl/dto/IndexDto.hpp" #include "server/web_impl/dto/IndexDto.hpp"

View File

@ -256,7 +256,7 @@ CommonUtil::EraseFromCache(const std::string& item_key) {
#ifdef MILVUS_GPU_VERSION #ifdef MILVUS_GPU_VERSION
server::Config& config = server::Config::GetInstance(); server::Config& config = server::Config::GetInstance();
std::vector<int64_t> gpus; std::vector<int64_t> gpus;
Status s = config.GetGpuResourceConfigSearchResources(gpus); config.GetGpuResourceConfigSearchResources(gpus);
for (auto& gpu : gpus) { for (auto& gpu : gpus) {
cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key); cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key);
} }

View File

@ -54,27 +54,31 @@ RolloutHandler(const char* filename, std::size_t size, el::Level level) {
int ret; int ret;
std::string m(std::string(dir) + "/" + s); std::string m(std::string(dir) + "/" + s);
s = m; s = m;
if (level == el::Level::Global) { switch (level) {
s.append("." + std::to_string(++global_idx)); case el::Level::Debug:
ret = rename(m.c_str(), s.c_str()); s.append("." + std::to_string(++debug_idx));
} else if (level == el::Level::Debug) { ret = rename(m.c_str(), s.c_str());
s.append("." + std::to_string(++debug_idx)); break;
ret = rename(m.c_str(), s.c_str()); case el::Level::Warning:
} else if (level == el::Level::Warning) { s.append("." + std::to_string(++warning_idx));
s.append("." + std::to_string(++warning_idx)); ret = rename(m.c_str(), s.c_str());
ret = rename(m.c_str(), s.c_str()); break;
} else if (level == el::Level::Trace) { case el::Level::Trace:
s.append("." + std::to_string(++trace_idx)); s.append("." + std::to_string(++trace_idx));
ret = rename(m.c_str(), s.c_str()); ret = rename(m.c_str(), s.c_str());
} else if (level == el::Level::Error) { break;
s.append("." + std::to_string(++error_idx)); case el::Level::Error:
ret = rename(m.c_str(), s.c_str()); s.append("." + std::to_string(++error_idx));
} else if (level == el::Level::Fatal) { ret = rename(m.c_str(), s.c_str());
s.append("." + std::to_string(++fatal_idx)); break;
ret = rename(m.c_str(), s.c_str()); case el::Level::Fatal:
} else { s.append("." + std::to_string(++fatal_idx));
s.append("." + std::to_string(++global_idx)); ret = rename(m.c_str(), s.c_str());
ret = rename(m.c_str(), s.c_str()); break;
default:
s.append("." + std::to_string(++global_idx));
ret = rename(m.c_str(), s.c_str());
break;
} }
} }

View File

@ -935,19 +935,25 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
command.set_cmd("tasktable"); command.set_cmd("tasktable");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("test"); command.set_cmd("test");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("status"); command.set_cmd("status");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("mode"); command.set_cmd("mode");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("build_commit_id"); command.set_cmd("build_commit_id");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("set_config"); command.set_cmd("set_config");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
command.set_cmd("get_config"); command.set_cmd("get_config");
handler->Cmd(&context, &command, &reply); handler->Cmd(&context, &command, &reply);
} }

View File

@ -15,7 +15,7 @@ def create_app(testing_config=None):
pool_recycle=config.SQL_POOL_RECYCLE, pool_timeout=config.SQL_POOL_TIMEOUT, pool_recycle=config.SQL_POOL_RECYCLE, pool_timeout=config.SQL_POOL_TIMEOUT,
pool_pre_ping=config.SQL_POOL_PRE_PING, max_overflow=config.SQL_MAX_OVERFLOW) pool_pre_ping=config.SQL_POOL_PRE_PING, max_overflow=config.SQL_MAX_OVERFLOW)
from mishards.connections import ConnectionMgr, ConnectionTopology from mishards.connections import ConnectionTopology
readonly_topo = ConnectionTopology() readonly_topo = ConnectionTopology()
writable_topo = ConnectionTopology() writable_topo = ConnectionTopology()

View File

@ -279,96 +279,3 @@ class ConnectionTopology(topology.Topology):
if status == topology.StatusType.DUPLICATED: if status == topology.StatusType.DUPLICATED:
group = None group = None
return status, group return status, group
@singleton
class ConnectionMgr:
def __init__(self):
self.metas = {}
self.conns = {}
@property
def conn_names(self):
return set(self.metas.keys()) - set(['WOSERVER'])
def conn(self, name, metadata, throw=False):
c = self.conns.get(name, None)
if not c:
url = self.metas.get(name, None)
if not url:
if not throw:
return None
raise exceptions.ConnectionNotFoundError(message='Connection {} not found'.format(name),
metadata=metadata)
this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY)
threaded = {
threading.get_ident(): this_conn
}
self.conns[name] = threaded
return this_conn
tid = threading.get_ident()
rconn = c.get(tid, None)
if not rconn:
url = self.metas.get(name, None)
if not url:
if not throw:
return None
raise exceptions.ConnectionNotFoundError('Connection {} not found'.format(name),
metadata=metadata)
this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY)
c[tid] = this_conn
return this_conn
return rconn
def on_new_meta(self, name, url):
logger.info('Register Connection: name={};url={}'.format(name, url))
self.metas[name] = url
conn = self.conn(name, metadata=None)
conn.on_connect(metadata=None)
status, _ = conn.conn.server_version()
if not status.OK():
logger.error('Cannot connect to newly added address: {}. Remove it now'.format(name))
self.unregister(name)
return False
return True
def on_duplicate_meta(self, name, url):
if self.metas[name] == url:
return self.on_same_meta(name, url)
return self.on_diff_meta(name, url)
def on_same_meta(self, name, url):
# logger.warning('Register same meta: {}:{}'.format(name, url))
return True
def on_diff_meta(self, name, url):
logger.warning('Received {} with diff url={}'.format(name, url))
self.metas[name] = url
self.conns[name] = {}
return True
def on_unregister_meta(self, name, url):
logger.info('Unregister name={};url={}'.format(name, url))
self.conns.pop(name, None)
return True
def on_nonexisted_meta(self, name):
logger.warning('Non-existed meta: {}'.format(name))
return False
def register(self, name, url):
meta = self.metas.get(name)
if not meta:
return self.on_new_meta(name, url)
else:
return self.on_duplicate_meta(name, url)
def unregister(self, name):
logger.info('Unregister Connection: name={}'.format(name))
url = self.metas.pop(name, None)
if url is None:
return self.on_nonexisted_meta(name)
return self.on_unregister_meta(name, url)

View File

@ -5,7 +5,7 @@ import random
import threading import threading
from milvus import Milvus from milvus import Milvus
from mishards.connections import (ConnectionMgr, Connection, from mishards.connections import (Connection,
ConnectionPool, ConnectionTopology, ConnectionGroup) ConnectionPool, ConnectionTopology, ConnectionGroup)
from mishards.topology import StatusType from mishards.topology import StatusType
from mishards import exceptions from mishards import exceptions
@ -15,31 +15,6 @@ logger = logging.getLogger(__name__)
@pytest.mark.usefixtures('app') @pytest.mark.usefixtures('app')
class TestConnection: class TestConnection:
@pytest.mark.skip
def test_manager(self):
mgr = ConnectionMgr()
mgr.register('pod1', '111')
mgr.register('pod2', '222')
mgr.register('pod2', '222')
mgr.register('pod2', '2222')
assert len(mgr.conn_names) == 2
mgr.unregister('pod1')
assert len(mgr.conn_names) == 1
mgr.unregister('pod2')
assert len(mgr.conn_names) == 0
mgr.register('WOSERVER', 'xxxx')
assert len(mgr.conn_names) == 0
assert not mgr.conn('XXXX', None)
with pytest.raises(exceptions.ConnectionNotFoundError):
mgr.conn('XXXX', None, True)
mgr.conn('WOSERVER', None)
def test_connection(self): def test_connection(self):
class Conn: class Conn:
def __init__(self, state): def __init__(self, state):

View File

@ -14,7 +14,7 @@ from mishards.service_handler import ServiceHandler
from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser
from mishards.factories import TableFilesFactory, TablesFactory, TableFiles, Tables from mishards.factories import TableFilesFactory, TablesFactory, TableFiles, Tables
from mishards.router import RouterMixin from mishards.router import RouterMixin
from mishards.connections import (ConnectionMgr, Connection, from mishards.connections import (Connection,
ConnectionPool, ConnectionTopology, ConnectionGroup) ConnectionPool, ConnectionTopology, ConnectionGroup)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -589,20 +589,18 @@ class TestAddBase:
expected: status ok and result length is equal to the length off added vectors expected: status ok and result length is equal to the length off added vectors
''' '''
collection = gen_unique_str() collection = gen_unique_str()
uri = "tcp://%s:%s" % (args["ip"], args["port"])
param = {'collection_name': collection, param = {'collection_name': collection,
'dimension': dim, 'dimension': dim,
'index_file_size': index_file_size, 'index_file_size': index_file_size,
'metric_type': MetricType.L2} 'metric_type': MetricType.L2}
milvus = get_milvus(args["handler"]) milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
milvus.connect(uri=uri)
milvus.create_collection(param) milvus.create_collection(param)
vector = gen_single_vector(dim) vector = gen_single_vector(dim)
process_num = 4 process_num = 4
loop_num = 5 loop_num = 5
processes = [] processes = []
def add(): def add():
milvus = get_milvus(args["handler"]) milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
milvus.connect(uri=uri) milvus.connect(uri=uri)
i = 0 i = 0
while i < loop_num: while i < loop_num:
@ -634,19 +632,16 @@ class TestAddBase:
thread_num = 8 thread_num = 8
threads = [] threads = []
collection = gen_unique_str() collection = gen_unique_str()
uri = "tcp://%s:%s" % (args["ip"], args["port"])
param = {'collection_name': collection, param = {'collection_name': collection,
'dimension': dim, 'dimension': dim,
'index_file_size': index_file_size, 'index_file_size': index_file_size,
'metric_type': MetricType.L2} 'metric_type': MetricType.L2}
milvus = get_milvus(args["handler"]) milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
milvus.connect(uri=uri)
milvus.create_collection(param) milvus.create_collection(param)
vectors = gen_vectors(nb, dim) vectors = gen_vectors(nb, dim)
def add(thread_i): def add(thread_i):
logging.getLogger().info("In thread-%d" % thread_i) logging.getLogger().info("In thread-%d" % thread_i)
milvus = get_milvus(args["handler"]) milvus = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
milvus.connect(uri=uri)
status, result = milvus.add_vectors(collection, records=vectors) status, result = milvus.add_vectors(collection, records=vectors)
assert status.OK() assert status.OK()
status = milvus.flush([collection]) status = milvus.flush([collection])

View File

@ -322,7 +322,6 @@ class TestCollection:
expected: collection_name equals with the collection name created expected: collection_name equals with the collection name created
''' '''
collection_name = gen_unique_str("test_collection") collection_name = gen_unique_str("test_collection")
uri = "tcp://%s:%s" % (args["ip"], args["port"])
param = {'collection_name': collection_name, param = {'collection_name': collection_name,
'dimension': dim, 'dimension': dim,
'index_file_size': index_file_size, 'index_file_size': index_file_size,
@ -336,8 +335,7 @@ class TestCollection:
process_num = 4 process_num = 4
processes = [] processes = []
for i in range(process_num): for i in range(process_num):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
p = Process(target=describecollection, args=(milvus,)) p = Process(target=describecollection, args=(milvus,))
processes.append(p) processes.append(p)
p.start() p.start()
@ -507,8 +505,6 @@ class TestCollection:
''' '''
process_num = 6 process_num = 6
processes = [] processes = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
def deletecollection(milvus): def deletecollection(milvus):
status = milvus.drop_collection(collection) status = milvus.drop_collection(collection)
# assert not status.code==0 # assert not status.code==0
@ -516,8 +512,7 @@ class TestCollection:
assert status.OK() assert status.OK()
for i in range(process_num): for i in range(process_num):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
p = Process(target=deletecollection, args=(milvus,)) p = Process(target=deletecollection, args=(milvus,))
processes.append(p) processes.append(p)
p.start() p.start()
@ -786,13 +781,11 @@ class TestCollection:
expected: collection_name in show collections expected: collection_name in show collections
''' '''
collection_name = gen_unique_str("test_collection") collection_name = gen_unique_str("test_collection")
uri = "tcp://%s:%s" % (args["ip"], args["port"])
param = {'collection_name': collection_name, param = {'collection_name': collection_name,
'dimension': dim, 'dimension': dim,
'index_file_size': index_file_size, 'index_file_size': index_file_size,
'metric_type': MetricType.L2} 'metric_type': MetricType.L2}
connect.create_collection(param) connect.create_collection(param)
def showcollections(milvus): def showcollections(milvus):
status, result = milvus.show_collections() status, result = milvus.show_collections()
assert status.OK() assert status.OK()
@ -802,8 +795,7 @@ class TestCollection:
processes = [] processes = []
for i in range(process_num): for i in range(process_num):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
p = Process(target=showcollections, args=(milvus,)) p = Process(target=showcollections, args=(milvus,))
processes.append(p) processes.append(p)
p.start() p.start()

View File

@ -181,7 +181,6 @@ class TestCollectionCount:
expected: the count is equal to the length of vectors expected: the count is equal to the length of vectors
''' '''
nq = 2 nq = 2
uri = "tcp://%s:%s" % (args["ip"], args["port"])
vectors = gen_vectors(nq, dim) vectors = gen_vectors(nq, dim)
res = connect.add_vectors(collection_name=collection, records=vectors) res = connect.add_vectors(collection_name=collection, records=vectors)
time.sleep(add_time_interval) time.sleep(add_time_interval)
@ -194,8 +193,7 @@ class TestCollectionCount:
process_num = 8 process_num = 8
processes = [] processes = []
for i in range(process_num): for i in range(process_num):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
p = Process(target=rows_count, args=(milvus, )) p = Process(target=rows_count, args=(milvus, ))
processes.append(p) processes.append(p)
p.start() p.start()
@ -326,7 +324,6 @@ class TestCollectionCountIP:
expected: the count is equal to the length of vectors expected: the count is equal to the length of vectors
''' '''
nq = 2 nq = 2
uri = "tcp://%s:%s" % (args["ip"], args["port"])
vectors = gen_vectors(nq, dim) vectors = gen_vectors(nq, dim)
res = connect.add_vectors(collection_name=ip_collection, records=vectors) res = connect.add_vectors(collection_name=ip_collection, records=vectors)
time.sleep(add_time_interval) time.sleep(add_time_interval)
@ -339,8 +336,7 @@ class TestCollectionCountIP:
process_num = 8 process_num = 8
processes = [] processes = []
for i in range(process_num): for i in range(process_num):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
p = Process(target=rows_count, args=(milvus,)) p = Process(target=rows_count, args=(milvus,))
processes.append(p) processes.append(p)
p.start() p.start()

View File

@ -203,19 +203,16 @@ class TestFlushBase:
expected: status ok expected: status ok
''' '''
collection = gen_unique_str() collection = gen_unique_str()
uri = "tcp://%s:%s" % (args["ip"], args["port"])
param = {'collection_name': collection, param = {'collection_name': collection,
'dimension': dim, 'dimension': dim,
'index_file_size': index_file_size, 'index_file_size': index_file_size,
'metric_type': MetricType.L2} 'metric_type': MetricType.L2}
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
milvus.create_collection(param) milvus.create_collection(param)
vectors = gen_vector(nb, dim) vectors = gen_vector(nb, dim)
status, ids = milvus.add_vectors(collection, vectors, ids=[i for i in range(nb)]) status, ids = milvus.add_vectors(collection, vectors, ids=[i for i in range(nb)])
def flush(collection_name): def flush(collection_name):
milvus = get_milvus(args["handler"]) milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
milvus.connect(uri=uri)
status = milvus.delete_by_id(collection_name, [i for i in range(nb)]) status = milvus.delete_by_id(collection_name, [i for i in range(nb)])
assert status.OK() assert status.OK()
status = milvus.flush([collection_name]) status = milvus.flush([collection_name])

View File

@ -166,11 +166,8 @@ class TestIndexBase:
threads_num = 8 threads_num = 8
threads = [] threads = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num): for i in range(threads_num):
m = get_milvus(args["handler"]) m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
m.connect(uri=uri)
t = threading.Thread(target=build, args=(m,)) t = threading.Thread(target=build, args=(m,))
threads.append(t) threads.append(t)
t.start() t.start()
@ -197,7 +194,6 @@ class TestIndexBase:
threads_num = 8 threads_num = 8
loop_num = 8 loop_num = 8
threads = [] threads = []
collection = [] collection = []
j = 0 j = 0
while j < (threads_num*loop_num): while j < (threads_num*loop_num):
@ -215,7 +211,6 @@ class TestIndexBase:
while i < loop_num: while i < loop_num:
# assert connect.has_collection(collection[ids*process_num+i]) # assert connect.has_collection(collection[ids*process_num+i])
status, ids = connect.add_vectors(collection[ids*threads_num+i], vectors) status, ids = connect.add_vectors(collection[ids*threads_num+i], vectors)
status = connect.create_index(collection[ids*threads_num+i], IndexType.IVFLAT, {"nlist": NLIST}) status = connect.create_index(collection[ids*threads_num+i], IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK() assert status.OK()
query_vec = [vectors[0]] query_vec = [vectors[0]]
@ -226,14 +221,10 @@ class TestIndexBase:
assert len(result[0]) == top_k assert len(result[0]) == top_k
assert result[0][0].distance == 0.0 assert result[0][0].distance == 0.0
i = i + 1 i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num): for i in range(threads_num):
m = get_milvus(args["handler"]) m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
m.connect(uri=uri)
ids = i ids = i
t = threading.Thread(target=create_index, args=(m,ids)) t = threading.Thread(target=create_index, args=(m, ids))
threads.append(t) threads.append(t)
t.start() t.start()
time.sleep(0.2) time.sleep(0.2)
@ -256,8 +247,7 @@ class TestIndexBase:
threads = [] threads = []
uri = "tcp://%s:%s" % (args["ip"], args["port"]) uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num): for i in range(threads_num):
m = get_milvus(args["handler"]) m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
m.connect(uri=uri)
if(i % 2 == 0): if(i % 2 == 0):
p = threading.Thread(target=build, args=(m,)) p = threading.Thread(target=build, args=(m,))
else: else:
@ -286,11 +276,8 @@ class TestIndexBase:
process_num = 8 process_num = 8
processes = [] processes = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num): for i in range(process_num):
m = get_milvus(args["handler"]) m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
m.connect(uri=uri)
p = Process(target=build, args=(m,)) p = Process(target=build, args=(m,))
processes.append(p) processes.append(p)
p.start() p.start()
@ -347,11 +334,8 @@ class TestIndexBase:
assert result[0][0].distance == 0.0 assert result[0][0].distance == 0.0
i = i + 1 i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num): for i in range(process_num):
m = get_milvus(args["handler"]) m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"])
m.connect(uri=uri)
ids = i ids = i
p = Process(target=create_index, args=(m,ids)) p = Process(target=create_index, args=(m,ids))
processes.append(p) processes.append(p)
@ -792,18 +776,15 @@ class TestIndexIP:
expected: return code equals to 0, and search success expected: return code equals to 0, and search success
''' '''
status, ids = connect.add_vectors(ip_collection, vectors) status, ids = connect.add_vectors(ip_collection, vectors)
def build(connect): def build(connect):
status = connect.create_index(ip_collection, IndexType.IVFLAT, {"nlist": NLIST}) status = connect.create_index(ip_collection, IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK() assert status.OK()
process_num = 8 process_num = 8
processes = [] processes = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num): for i in range(process_num):
m = get_milvus(args["handler"]) m = get_milvus(args["ip"], args["port"], handler=args["handler"])
m.connect(uri=uri)
p = Process(target=build, args=(m,)) p = Process(target=build, args=(m,))
processes.append(p) processes.append(p)
p.start() p.start()
@ -858,11 +839,8 @@ class TestIndexIP:
assert result[0][0].distance == 0.0 assert result[0][0].distance == 0.0
i = i + 1 i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num): for i in range(process_num):
m = get_milvus(args["handler"]) m = get_milvus(args["ip"], args["port"], handler=args["handler"])
m.connect(uri=uri)
ids = i ids = i
p = Process(target=create_index, args=(m,ids)) p = Process(target=create_index, args=(m,ids))
processes.append(p) processes.append(p)