milvus/tests/python_client/base/connections_wrapper.py
XuanYang-cn b2a2ef1820
Update pymilvus version to 2.0.0rc10.dev5 (#15197)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2022-01-14 12:55:34 +08:00

61 lines
3.1 KiB
Python

from pymilvus import Connections
from pymilvus import DefaultConfig
import sys
sys.path.append("..")
from check.func_check import ResponseChecker
from utils.api_request import api_request
class ApiConnectionsWrapper:
def __init__(self):
self.connection = Connections()
def add_connection(self, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
response, is_succ = api_request([self.connection.add_connection], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, **kwargs).run()
return response, check_result
def disconnect(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
response, is_succ = api_request([self.connection.disconnect, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def remove_connection(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
response, is_succ = api_request([self.connection.remove_connection, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def connect(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
response, succ = api_request([self.connection.connect, alias], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, succ, alias=alias, **kwargs).run()
return response, check_result
def has_connection(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
response, succ = api_request([self.connection.has_connection, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, succ, alias=alias).run()
return response, check_result
# def get_connection(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None):
# func_name = sys._getframe().f_code.co_name
# response, is_succ = api_request([self.connection.get_connection, alias])
# check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
# return response, check_result
def list_connections(self, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
response, is_succ = api_request([self.connection.list_connections])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ).run()
return response, check_result
def get_connection_addr(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
response, is_succ = api_request([self.connection.get_connection_addr, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result