milvus/shards/mishards/server.py
XuPeng-SH 5f2f8bdc8b
[skip ci] (shards) Upgrade Mishards for #1569 (#1570)
* [skip ci](shards): export MAX_WORKERS as configurable parameter

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): skip mishards .env git info

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): support more robust static discovery host configuration

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): update static provider that terminate server if connection to downstream server error during startup

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): add topology.py

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): add connection pool

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): add topology test

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): refactory using topo

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): refactory static discovery using topo

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): refactory kubernetes discovery using topo

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): add more test for connection pool

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): export 19541 and 19542 for all_in_one demo

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): check version on new connection

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): mock connections

Signed-off-by: peng.xu <peng.xu@zilliz.com>

* [skip ci](shards): update tests

Signed-off-by: peng.xu <peng.xu@zilliz.com>
2020-03-14 13:30:21 +08:00

132 lines
4.1 KiB
Python

import logging
import sys
import grpc
import time
import socket
import inspect
from urllib.parse import urlparse
from functools import wraps
from concurrent import futures
from grpc._cython import cygrpc
from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server
from mishards.grpc_utils import is_grpc_method
from mishards.service_handler import ServiceHandler
from mishards import settings
logger = logging.getLogger(__name__)
class Server:
def __init__(self):
self.pre_run_handlers = set()
self.grpc_methods = set()
self.error_handlers = {}
self.exit_flag = False
def init_app(self,
writable_topo,
readonly_topo,
tracer,
router,
discover,
port=19530,
max_workers=10,
**kwargs):
self.port = int(port)
self.writable_topo = writable_topo
self.readonly_topo = readonly_topo
self.tracer = tracer
self.router = router
self.discover = discover
logger.debug('Init grpc server with max_workers: {}'.format(max_workers))
self.server_impl = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
options=[(cygrpc.ChannelArgKey.max_send_message_length, -1),
(cygrpc.ChannelArgKey.max_receive_message_length, -1)])
self.server_impl = self.tracer.decorate(self.server_impl)
self.register_pre_run_handler(self.pre_run_handler)
def pre_run_handler(self):
woserver = settings.WOSERVER
url = urlparse(woserver)
ip = socket.gethostbyname(url.hostname)
socket.inet_pton(socket.AF_INET, ip)
_, group = self.writable_topo.create('default')
group.create(name='WOSERVER', uri='{}://{}:{}'.format(url.scheme, ip, url.port or 80))
def register_pre_run_handler(self, func):
logger.info('Regiterring {} into server pre_run_handlers'.format(func))
self.pre_run_handlers.add(func)
return func
def wrap_method_with_errorhandler(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if e.__class__ in self.error_handlers:
return self.error_handlers[e.__class__](e)
raise
return wrapper
def errorhandler(self, exception):
if inspect.isclass(exception) and issubclass(exception, Exception):
def wrapper(func):
self.error_handlers[exception] = func
return func
return wrapper
return exception
def on_pre_run(self):
for handler in self.pre_run_handlers:
handler()
return self.discover.start()
def start(self, port=None):
handler_class = self.decorate_handler(ServiceHandler)
add_MilvusServiceServicer_to_server(
handler_class(tracer=self.tracer,
router=self.router), self.server_impl)
self.server_impl.add_insecure_port("[::]:{}".format(
str(port or self.port)))
self.server_impl.start()
def run(self, port):
logger.info('Milvus server start ......')
port = port or self.port
ok = self.on_pre_run()
if not ok:
logger.error('Terminate server due to error found in on_pre_run')
sys.exit(1)
self.start(port)
logger.info('Listening on port {}'.format(port))
try:
while not self.exit_flag:
time.sleep(5)
except KeyboardInterrupt:
self.stop()
def stop(self):
logger.info('Server is shuting down ......')
self.exit_flag = True
self.server_impl.stop(0)
self.tracer.close()
logger.info('Server is closed')
def decorate_handler(self, handler):
for key, attr in handler.__dict__.items():
if is_grpc_method(attr):
setattr(handler, key, self.wrap_method_with_errorhandler(attr))
return handler