Refactor registry plugin and simplify its usage (#6712)

This commit is contained in:
kezhenxu94 2021-11-07 15:18:10 +08:00 committed by GitHub
parent 73f20b7553
commit 5400abc74f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 909 additions and 1928 deletions

View File

@ -15,8 +15,6 @@
# limitations under the License.
#
# This file is only to override the production configurations in standalone server.
registry.plugin.dir=./dolphinscheduler-dist/target/dolphinscheduler-dist-2.0.0-SNAPSHOT/lib/plugin/registry/zookeeper
registry.plugin.name=zookeeper
registry.servers=127.0.0.1:2181
dolphinscheduler/dolphinscheduler-e2e @kezhenxu94
dolphinscheduler/dolphinscheduler-registry @kezhenxu94
dolphinscheduler/dolphinscheduler-standalone-server @kezhenxu94

View File

@ -15,13 +15,5 @@
# limitations under the License.
#
#registry.plugin.dir config the Registry Plugin dir.
registry.plugin.dir=${REGISTRY_PLUGIN_DIR}
registry.plugin.name=${REGISTRY_PLUGIN_NAME}
registry.servers=${REGISTRY_SERVERS}
#maven.local.repository=/usr/local/localRepository
#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml

View File

@ -37,7 +37,6 @@ export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
#============================================================================
# Registry
#============================================================================
export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry"}
export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"}
export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"}

View File

@ -39,7 +39,6 @@ DATABASE_PARAMS=characterEncoding=utf8
#============================================================================
# Registry
#============================================================================
REGISTRY_PLUGIN_DIR=lib/plugin/registry
REGISTRY_PLUGIN_NAME=zookeeper
REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181

View File

@ -140,4 +140,4 @@ volumes:
dolphinscheduler-worker-data:
dolphinscheduler-logs:
dolphinscheduler-shared-local:
dolphinscheduler-resource-local:
dolphinscheduler-resource-local:

View File

@ -166,12 +166,6 @@ Create a database environment variables.
Create a registry environment variables.
*/}}
{{- define "dolphinscheduler.registry.env_vars" -}}
- name: REGISTRY_PLUGIN_DIR
{{- if .Values.zookeeper.enabled }}
value: "lib/plugin/registry"
{{- else }}
value: {{ .Values.externalRegistry.registryPluginDir }}
{{- end }}
- name: REGISTRY_PLUGIN_NAME
{{- if .Values.zookeeper.enabled }}
value: "zookeeper"
@ -239,4 +233,4 @@ Create a fsFileResourcePersistence volumeMount.
- mountPath: {{ default "/dolphinscheduler" .Values.common.configmap.RESOURCE_UPLOAD_PATH | quote }}
name: {{ include "dolphinscheduler.fullname" . }}-fs-file
{{- end -}}
{{- end -}}
{{- end -}}

View File

@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.List;
@ -48,6 +48,9 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
@Autowired
private MonitorDBDao monitorDBDao;
@Autowired
private RegistryClient registryClient;
/**
* query database state
*
@ -55,7 +58,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return data base state
*/
@Override
public Map<String,Object> queryDatabaseState(User loginUser) {
public Map<String, Object> queryDatabaseState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
@ -74,13 +77,13 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return master information list
*/
@Override
public Map<String,Object> queryMaster(User loginUser) {
public Map<String, Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<Server> masterServers = getServerListFromRegistry(true);
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
putMsg(result, Status.SUCCESS);
return result;
}
@ -92,12 +95,10 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return zookeeper information list
*/
@Override
public Map<String,Object> queryZookeeperState(User loginUser) {
public Map<String, Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<ZookeeperRecord> zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
result.put(Constants.DATA_LIST, null);
putMsg(result, Status.SUCCESS);
return result;
@ -111,46 +112,48 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return worker information list
*/
@Override
public Map<String,Object> queryWorker(User loginUser) {
public Map<String, Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<WorkerServerModel> workerServers = getServerListFromRegistry(false)
.stream()
.map((Server server) -> {
WorkerServerModel model = new WorkerServerModel();
model.setId(server.getId());
model.setHost(server.getHost());
model.setPort(server.getPort());
model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
model.setResInfo(server.getResInfo());
model.setCreateTime(server.getCreateTime());
model.setLastHeartbeatTime(server.getLastHeartbeatTime());
return model;
})
.collect(Collectors.toList());
.stream()
.map((Server server) -> {
WorkerServerModel model = new WorkerServerModel();
model.setId(server.getId());
model.setHost(server.getHost());
model.setPort(server.getPort());
model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
model.setResInfo(server.getResInfo());
model.setCreateTime(server.getCreateTime());
model.setLastHeartbeatTime(server.getLastHeartbeatTime());
return model;
})
.collect(Collectors.toList());
Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
.stream()
.collect(Collectors.toMap(
(WorkerServerModel worker) -> {
String[] s = worker.getZkDirectories().iterator().next().split("/");
return s[s.length - 1];
}
, Function.identity()
, (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
return oldOne;
}));
.stream()
.collect(Collectors.toMap(
(WorkerServerModel worker) -> {
String[] s = worker.getZkDirectories().iterator().next().split("/");
return s[s.length - 1];
}
, Function.identity()
, (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
return oldOne;
}));
result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
putMsg(result,Status.SUCCESS);
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public List<Server> getServerListFromRegistry(boolean isMaster) {
return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers();
return isMaster
? registryClient.getServerList(NodeType.MASTER)
: registryClient.getServerList(NodeType.WORKER);
}
}

View File

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -31,14 +30,17 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -58,10 +60,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class);
@Autowired
WorkerGroupMapper workerGroupMapper;
private WorkerGroupMapper workerGroupMapper;
@Autowired
ProcessInstanceMapper processInstanceMapper;
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private RegistryClient registryClient;
/**
* create or update a worker group
@ -139,7 +144,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
// check zookeeper
String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + workerGroup.getName();
return RegistryCenterUtils.isNodeExisted(workerGroupPath);
return registryClient.exists(workerGroupPath);
}
/**
@ -149,7 +154,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
Map<String, String> serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true);
Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
@ -250,11 +255,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
// worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
List<String> workerGroupList = null;
Collection<String> workerGroupList = null;
try {
workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath);
workerGroupList = registryClient.getChildrenKeys(workerPath);
} catch (Exception e) {
logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
}
if (CollectionUtils.isEmpty(workerGroupList)) {
@ -268,9 +273,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
List<String> childrenNodes = null;
Collection<String> childrenNodes = null;
try {
childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath);
childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
} catch (Exception e) {
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
}
@ -281,7 +286,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.get(0));
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
wg.setSystemDefault(true);
@ -328,7 +333,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
List<String> serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true);
Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* monitor zookeeper info todo registry-spi
* fixme Some of the information obtained in the api belongs to the unique information of zk.
* I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided.
*/
public class RegistryCenterUtils {
private static RegistryClient registryClient = RegistryClient.getInstance();
/**
* @return zookeeper info list
*/
public static List<ZookeeperRecord> zookeeperInfoList() {
return null;
}
/**
* get master servers
*
* @return master server information
*/
public static List<Server> getMasterServers() {
return registryClient.getServerList(NodeType.MASTER);
}
/**
* master construct is the same with worker, use the master instead
*
* @return worker server informations
*/
public static List<Server> getWorkerServers() {
return registryClient.getServerList(NodeType.WORKER);
}
public static Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerMaps(nodeType, hostOnly);
}
public static List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerNodeList(nodeType, hostOnly);
}
public static boolean isNodeExisted(String key) {
return registryClient.isExisted(key);
}
public static List<String> getChildrenNodes(final String key) {
return registryClient.getChildrenKeys(key);
}
public static String getNodeData(String key) {
return registryClient.get(key);
}
}

View File

@ -20,11 +20,9 @@ package org.apache.dolphinscheduler.api.controller;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
@ -36,11 +34,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@ -51,11 +44,8 @@ import org.springframework.web.context.WebApplicationContext;
/**
* abstract controller test
*/
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(SpringRunner.class)
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class AbstractControllerTest {
public static final String SESSION_ID = "sessionId";
@ -74,9 +64,6 @@ public class AbstractControllerTest {
@Before
public void setUp() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
PowerMockito.mockStatic(RegistryCenterUtils.class);
mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
createSession();
@ -98,7 +85,7 @@ public class AbstractControllerTest {
String session = sessionService.createSession(loginUser, "127.0.0.1");
sessionId = session;
Assert.assertTrue(!StringUtils.isEmpty(session));
Assert.assertFalse(StringUtils.isEmpty(session));
}
public Map<String, Object> success() {

View File

@ -48,9 +48,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* scheduler controller test
*/
public class SchedulerControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(SchedulerControllerTest.class);

View File

@ -50,9 +50,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* task instance controller test
*/
public class TaskInstanceControllerTest extends AbstractControllerTest {
@InjectMocks

View File

@ -37,9 +37,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* tenant controller test
*/
public class TenantControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(TenantControllerTest.class);

View File

@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.Map;
@ -59,12 +59,15 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
@MockBean
private ProcessInstanceMapper processInstanceMapper;
@MockBean
private RegistryClient registryClient;
@Test
public void testSaveWorkerGroup() throws Exception {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
@ -33,105 +34,32 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* worker group service test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class WorkerGroupServiceTest {
@MockBean
private RegistryClient registryClient;
@InjectMocks
@Autowired
private WorkerGroupServiceImpl workerGroupService;
@Mock
@MockBean
private WorkerGroupMapper workerGroupMapper;
@Mock
@MockBean
private ProcessInstanceMapper processInstanceMapper;
private String groupName = "groupName000001";
/* @Before
public void init() {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot("/dolphinscheduler_qzw");
Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("default");
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultAddressList = new ArrayList<>();
defaultAddressList.add("192.168.220.188:1234");
defaultAddressList.add("192.168.220.189:1234");
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
}
*//**
* create or update a worker group
*//*
@Test
public void testSaveWorkerGroup() {
// worker server maps
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214");
Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps);
User user = new User();
// general user add
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG));
// success
user.setUserType(UserType.ADMIN_USER);
result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
// group name exist
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2));
Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList());
result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234");
Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS));
}*/
/**
* query worker group paging
*/
/* @Test
public void testQueryAllGroupPaging() {
User user = new User();
// general user add
user.setUserType(UserType.ADMIN_USER);
Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, null);
PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
Assert.assertEquals(pageInfo.getLists().size(), 1);
}*/
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
}
@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup();

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.model.Server;
import java.util.List;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/**
* zookeeper monitor utils test
*/
@Ignore
public class RegistryCenterUtilsTest {
@Test
public void testGetMasterList(){
List<Server> masterServerList = RegistryCenterUtils.getMasterServers();
List<Server> workerServerList = RegistryCenterUtils.getWorkerServers();
Assert.assertTrue(masterServerList.size() >= 0);
Assert.assertTrue(workerServerList.size() >= 0);
}
}

View File

@ -90,8 +90,6 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
public static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
public static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
public static final String REGISTRY_SERVERS = "registry.servers";
/**

View File

@ -16,15 +16,6 @@
*/
package org.apache.dolphinscheduler.common.enums;
/**
* zk node type
*/
public enum NodeType {
/**
* 0 master node;
* 1 worker node;
* 2 dead_server node;
*/
MASTER, WORKER, DEAD_SERVER;
MASTER, WORKER, DEAD_SERVER
}

View File

@ -75,11 +75,6 @@
<unpack/>
</artifact>
</artifactSet>
<artifactSet to="lib/plugin/registry/zookeeper">
<artifact id="${project.groupId}:dolphinscheduler-registry-zookeeper:zip:${project.version}">
<unpack/>
</artifact>
</artifactSet>
<!-- Task Plugins -->
<artifactSet to="lib/plugin/task/datax">
<artifact id="${project.groupId}:dolphinscheduler-task-datax:zip:${project.version}">

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZookeeperConnectionStateListener implements ConnectionStateListener {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperConnectionStateListener.class);
private RegistryConnectListener registryConnectListener;
public ZookeeperConnectionStateListener(RegistryConnectListener registryConnectListener) {
this.registryConnectListener = registryConnectListener;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
registryConnectListener.notify(RegistryConnectState.LOST);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
registryConnectListener.notify(RegistryConnectState.RECONNECTED);
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("zookeeper connection SUSPENDED");
registryConnectListener.notify(RegistryConnectState.SUSPENDED);
}
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.register.RegistryFactory;
import com.google.common.collect.ImmutableList;
/**
* zookeeper registry plugin
*/
public class ZookeeperRegistryPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<RegistryFactory> getRegisterFactorys() {
return ImmutableList.of(new ZookeeperRegistryFactory());
}
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Apache Software Foundation (ASF) under one or more contributor
~ license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright
~ ownership. Apache Software Foundation (ASF) licenses this file to you under
~ the Apache License, Version 2.0 (the "License"); you may
~ not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-registry</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-registry-api</artifactId>
</project>

View File

@ -0,0 +1,25 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
@FunctionalInterface
public interface ConnectionListener {
void onUpdate(ConnectionState newState);
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public enum ConnectionState {
CONNECTED,
RECONNECTED,
SUSPENDED,
DISCONNECTED
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
@Getter
@Setter
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(fluent = true)
public class Event {
private String key;
private String path;
private String data;
private Type type;
public enum Type {
ADD,
REMOVE,
UPDATE
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
public interface Registry extends Closeable {
void start(Map<String, String> config);
boolean subscribe(String path, SubscribeListener listener);
void unsubscribe(String path);
void addConnectionStateListener(ConnectionListener listener);
String get(String key);
void put(String key, String value, boolean deleteOnDisconnect);
void delete(String key);
Collection<String> children(String key);
boolean exists(String key);
boolean acquireLock(String key);
boolean releaseLock(String key);
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public final class RegistryException extends RuntimeException {
public RegistryException(String message, Throwable cause) {
super(message, cause);
}
public RegistryException(String message) {
super(message);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public interface RegistryFactory {
String name();
Registry create();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import static java.util.stream.Collectors.toMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Function;
import java.util.stream.StreamSupport;
public final class RegistryFactoryLoader {
public static Map<String, RegistryFactory> load() {
final ServiceLoader<RegistryFactory> factories = ServiceLoader.load(RegistryFactory.class);
return StreamSupport.stream(factories.spliterator(), false)
.collect(toMap(RegistryFactory::name, Function.identity()));
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public interface SubscribeListener {
void notify(Event event);
}

View File

@ -19,18 +19,19 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-registry-plugin</artifactId>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<!-- can be load as a Alert Plugin when development and run server in IDE -->
<packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
@ -57,7 +58,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
@ -76,11 +76,5 @@
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>dolphinscheduler-registry-zookeeper-${project.version}</finalName>
</build>
</project>
</project>

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionStateListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public final class ZookeeperConnectionStateListener implements ConnectionStateListener {
private final ConnectionListener listener;
@Override
public void stateChanged(CuratorFramework client,
org.apache.curator.framework.state.ConnectionState newState) {
switch (newState) {
case LOST:
log.warn("Registry disconnected");
listener.onUpdate(ConnectionState.DISCONNECTED);
break;
case RECONNECTED:
log.info("Registry reconnected");
listener.onUpdate(ConnectionState.RECONNECTED);
break;
case SUSPENDED:
log.warn("Registry suspended");
listener.onUpdate(ConnectionState.SUSPENDED);
break;
default:
break;
}
}
}

View File

@ -28,21 +28,19 @@ import static org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperCon
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.ListenerManager;
import org.apache.dolphinscheduler.spi.register.Registry;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryException;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@ -56,28 +54,18 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Strings;
public class ZookeeperRegistry implements Registry {
public final class ZookeeperRegistry implements Registry {
private CuratorFramework client;
/**
* treeCache map
* k-subscribe key
* v-listener
*/
private Map<String, TreeCache> treeCacheMap = new HashMap<>();
private final Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
/**
* Distributed lock map
*/
private ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
private static final ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
/**
* build retry policy
*/
private static RetryPolicy buildRetryPolicy(Map<String, String> registerData) {
int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(registerData.get(BASE_SLEEP_TIME.getName()));
int maxRetries = MAX_RETRIES.getParameterValue(registerData.get(MAX_RETRIES.getName()));
@ -85,35 +73,32 @@ public class ZookeeperRegistry implements Registry {
return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepMs);
}
/**
* build digest
*/
private static void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
builder.authorization(DIGEST.getName(), digest.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
@Override
public void init(Map<String, String> registerData) {
public void start(Map<String, String> config) {
CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder()
.connectString(SERVERS.getParameterValue(config.get(SERVERS.getName())))
.retryPolicy(buildRetryPolicy(config))
.namespace(NAME_SPACE.getParameterValue(config.get(NAME_SPACE.getName())))
.sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(config.get(SESSION_TIMEOUT_MS.getName())))
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(config.get(CONNECTION_TIMEOUT_MS.getName())));
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(SERVERS.getParameterValue(registerData.get(SERVERS.getName())))
.retryPolicy(buildRetryPolicy(registerData))
.namespace(NAME_SPACE.getParameterValue(registerData.get(NAME_SPACE.getName())))
.sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(registerData.get(SESSION_TIMEOUT_MS.getName())))
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(registerData.get(CONNECTION_TIMEOUT_MS.getName())));
String digest = DIGEST.getParameterValue(registerData.get(DIGEST.getName()));
String digest = DIGEST.getParameterValue(config.get(DIGEST.getName()));
if (!Strings.isNullOrEmpty(digest)) {
buildDigest(builder, digest);
}
@ -121,7 +106,7 @@ public class ZookeeperRegistry implements Registry {
client.start();
try {
if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(registerData.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(config.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
client.close();
throw new RegistryException("zookeeper connect timeout");
}
@ -132,55 +117,26 @@ public class ZookeeperRegistry implements Registry {
}
@Override
public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(registryConnectListener));
public void addConnectionStateListener(ConnectionListener listener) {
client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
}
@Override
public boolean subscribe(String path, SubscribeListener subscribeListener) {
if (null != treeCacheMap.get(path)) {
return false;
}
TreeCache treeCache = new TreeCache(client, path);
TreeCacheListener treeCacheListener = (client, event) -> {
TreeCacheEvent.Type type = event.getType();
DataChangeEvent eventType = null;
String dataPath = null;
switch (type) {
case NODE_ADDED:
dataPath = event.getData().getPath();
eventType = DataChangeEvent.ADD;
break;
case NODE_UPDATED:
eventType = DataChangeEvent.UPDATE;
dataPath = event.getData().getPath();
break;
case NODE_REMOVED:
eventType = DataChangeEvent.REMOVE;
dataPath = event.getData().getPath();
break;
default:
}
if (null != eventType && null != dataPath) {
ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType);
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCacheMap.put(path, treeCache);
public boolean subscribe(String path, SubscribeListener listener) {
final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
try {
treeCache.start();
} catch (Exception e) {
throw new RegistryException("start zookeeper tree cache error", e);
treeCacheMap.remove(path);
throw new RegistryException("Failed to subscribe listener for key: " + path, e);
}
ListenerManager.addListener(path, subscribeListener);
return true;
}
@Override
public void unsubscribe(String path) {
TreeCache treeCache = treeCacheMap.get(path);
treeCache.close();
ListenerManager.removeListener(path);
CloseableUtils.closeQuietly(treeCacheMap.get(path));
}
@Override
@ -193,12 +149,7 @@ public class ZookeeperRegistry implements Registry {
}
@Override
public void remove(String key) {
delete(key);
}
@Override
public boolean isExisted(String key) {
public boolean exists(String key) {
try {
return null != client.checkExists().forPath(key);
} catch (Exception e) {
@ -207,47 +158,22 @@ public class ZookeeperRegistry implements Registry {
}
@Override
public void persist(String key, String value) {
try {
if (isExisted(key)) {
update(key, value);
return;
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
public void put(String key, String value, boolean deleteOnDisconnect) {
final CreateMode mode = deleteOnDisconnect ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
try {
client.create()
.orSetData()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RegistryException("zookeeper persist error", e);
throw new RegistryException("Failed to put registry key: " + key, e);
}
}
@Override
public void persistEphemeral(String key, String value) {
try {
if (isExisted(key)) {
update(key, value);
return;
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RegistryException("zookeeper persist ephemeral error", e);
}
}
@Override
public void update(String key, String value) {
try {
if (!isExisted(key)) {
return;
}
TransactionOp transactionOp = client.transactionOp();
client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new RegistryException("zookeeper update error", e);
}
}
@Override
public List<String> getChildren(String key) {
public List<String> children(String key) {
try {
List<String> result = client.getChildren().forPath(key);
result.sort(Comparator.reverseOrder());
@ -258,23 +184,20 @@ public class ZookeeperRegistry implements Registry {
}
@Override
public boolean delete(String nodePath) {
public void delete(String nodePath) {
try {
client.delete()
.deletingChildrenIfNeeded()
.forPath(nodePath);
} catch (KeeperException.NoNodeException ignore) {
// the node is not exist, we can believe the node has been removed
.deletingChildrenIfNeeded()
.forPath(nodePath);
} catch (KeeperException.NoNodeException ignored) {
// Is already deleted or does not exist
} catch (Exception e) {
throw new RegistryException("zookeeper delete key error", e);
throw new RegistryException("Failed to delete registry key: " + nodePath, e);
}
return true;
}
@Override
public boolean acquireLock(String key) {
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
try {
interProcessMutex.acquire();
@ -291,7 +214,6 @@ public class ZookeeperRegistry implements Registry {
throw new RegistryException("zookeeper release lock error", e);
}
}
}
@Override
@ -311,22 +233,35 @@ public class ZookeeperRegistry implements Registry {
return true;
}
public CuratorFramework getClient() {
return client;
}
@Override
public void close() {
treeCacheMap.forEach((key, value) -> value.close());
waitForCacheClose(500);
treeCacheMap.values().forEach(CloseableUtils::closeQuietly);
CloseableUtils.closeQuietly(client);
}
private void waitForCacheClose(long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
static final class EventAdaptor extends Event {
public EventAdaptor(TreeCacheEvent event, String key) {
key(key);
switch (event.getType()) {
case NODE_ADDED:
type(Type.ADD);
break;
case NODE_UPDATED:
type(Type.UPDATE);
break;
case NODE_REMOVED:
type(Type.REMOVE);
break;
default:
break;
}
final ChildData data = event.getData();
if (data != null) {
path(data.getPath());
data(new String(data.getData()));
}
}
}
}

View File

@ -17,16 +17,16 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.spi.register.Registry;
import org.apache.dolphinscheduler.spi.register.RegistryFactory;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryFactory;
/**
* Zookeeper registry factory
*/
public class ZookeeperRegistryFactory implements RegistryFactory {
import com.google.auto.service.AutoService;
@AutoService(RegistryFactory.class)
public final class ZookeeperRegistryFactory implements RegistryFactory {
@Override
public String getName() {
public String name() {
return "zookeeper";
}

View File

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.test.TestingServer;
@ -50,18 +50,18 @@ public class ZookeeperRegistryTest {
server = new TestingServer(true);
Map<String, String> registryConfig = new HashMap<>();
registryConfig.put(ZookeeperConfiguration.SERVERS.getName(), server.getConnectString());
registry.init(registryConfig);
registry.persist("/sub", "");
registry.start(registryConfig);
registry.put("/sub", "", false);
}
@Test
public void persistTest() {
registry.persist("/nodes/m1", "");
registry.persist("/nodes/m2", "");
Assert.assertEquals(Arrays.asList("m2", "m1"), registry.getChildren("/nodes"));
Assert.assertTrue(registry.isExisted("/nodes/m1"));
registry.put("/nodes/m1", "", false);
registry.put("/nodes/m2", "", false);
Assert.assertEquals(Arrays.asList("m2", "m1"), registry.children("/nodes"));
Assert.assertTrue(registry.exists("/nodes/m1"));
registry.delete("/nodes/m2");
Assert.assertFalse(registry.isExisted("/nodes/m2"));
Assert.assertFalse(registry.exists("/nodes/m2"));
}
@Test
@ -112,10 +112,9 @@ public class ZookeeperRegistryTest {
}
class TestListener implements SubscribeListener {
static class TestListener implements SubscribeListener {
@Override
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
public void notify(Event event) {
logger.info("I'm test listener");
}
}

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Apache Software Foundation (ASF) under one or more contributor
~ license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright
~ ownership. Apache Software Foundation (ASF) licenses this file to you under
~ the Apache License, Version 2.0 (the "License"); you may
~ not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-registry</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>dolphinscheduler-registry-zookeeper</module>
</modules>
</project>

View File

@ -24,20 +24,25 @@
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-plugin</artifactId>
<artifactId>dolphinscheduler-registry</artifactId>
<packaging>pom</packaging>
<modules>
<module>dolphinscheduler-registry-api</module>
<module>dolphinscheduler-registry-plugins</module>
</modules>
<dependencies>
<!-- dolphinscheduler -->
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
</dependencies>
<modules>
<module>dolphinscheduler-registry-zookeeper</module>
</modules>
</project>
</project>

View File

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -42,11 +43,10 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
import org.apache.commons.lang.StringUtils;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
@ -79,6 +79,7 @@ public class MasterRegistryClient {
@Autowired
private ProcessService processService;
@Autowired
private RegistryClient registryClient;
/**
@ -104,12 +105,11 @@ public class MasterRegistryClient {
public void init() {
this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
public void start() {
String nodeLock = registryClient.getMasterStartUpLockPath();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
@ -117,7 +117,7 @@ public class MasterRegistryClient {
// master registry
registry();
String registryPath = getMasterPath();
registryClient.handleDeadServer(registryPath, NodeType.MASTER, Constants.DELETE_OP);
registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP);
// init system node
@ -143,7 +143,8 @@ public class MasterRegistryClient {
}
public void closeRegistry() {
unRegistry();
// TODO unsubscribe MasterRegistryDataListener
deregister();
}
/**
@ -167,7 +168,7 @@ public class MasterRegistryClient {
return;
}
// handle dead server
registryClient.handleDeadServer(path, nodeType, Constants.ADD_OP);
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
//failover server
if (failover) {
@ -209,9 +210,9 @@ public class MasterRegistryClient {
private String getFailoverLockPath(NodeType nodeType) {
switch (nodeType) {
case MASTER:
return registryClient.getMasterFailoverLockPath();
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
case WORKER:
return registryClient.getWorkerFailoverLockPath();
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
default:
return "";
}
@ -289,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
|| !checkOwner
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
|| !checkOwner
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.",
taskInstance.getProcessInstanceId(), taskInstance.getId());
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@ -348,14 +349,6 @@ public class MasterRegistryClient {
logger.info("master failover end");
}
public void blockAcquireMutex() {
registryClient.getLock(registryClient.getMasterLockPath());
}
public void releaseLock() {
registryClient.releaseLock(registryClient.getMasterLockPath());
}
/**
* registry
*/
@ -364,36 +357,24 @@ public class MasterRegistryClient {
localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE,
registryClient);
masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE,
registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
registryClient.addConnectionStateListener(newState -> {
if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
registryClient.persistEphemeral(localNodePath, "");
}
});
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
class MasterRegistryConnectStateListener implements RegistryConnectListener {
@Override
public void notify(RegistryConnectState newState) {
if (RegistryConnectState.RECONNECTED == newState) {
registryClient.persistEphemeral(localNodePath, "");
}
if (RegistryConnectState.SUSPENDED == newState) {
registryClient.persistEphemeral(localNodePath, "");
}
}
}
/**
* remove registry info
*/
public void unRegistry() {
public void deregister() {
try {
String address = getLocalAddress();
String localNodePath = getMasterPath();

View File

@ -22,42 +22,43 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
public class MasterRegistryDataListener implements SubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class);
private MasterRegistryClient masterRegistryClient;
private final MasterRegistryClient masterRegistryClient;
public MasterRegistryDataListener() {
masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
}
@Override
public void notify(String path, String data, DataChangeEvent event) {
public void notify(Event event) {
final String path = event.path();
if (Strings.isNullOrEmpty(path)) {
return;
}
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
handleMasterEvent(event, path);
handleMasterEvent(event);
} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
//monitor worker
handleWorkerEvent(event, path);
handleWorkerEvent(event);
}
}
/**
* monitor master
*
* @param event event
* @param path path
*/
public void handleMasterEvent(DataChangeEvent event, String path) {
switch (event) {
public void handleMasterEvent(Event event) {
final String path = event.path();
switch (event.type()) {
case ADD:
logger.info("master node added : {}", path);
break;
@ -69,14 +70,9 @@ public class MasterRegistryDataListener implements SubscribeListener {
}
}
/**
* monitor worker
*
* @param event event
* @param path path
*/
public void handleWorkerEvent(DataChangeEvent event, String path) {
switch (event) {
public void handleWorkerEvent(Event event) {
final String path = event.path();
switch (event.type()) {
case ADD:
logger.info("worker node added : {}", path);
break;

View File

@ -27,16 +27,18 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -101,10 +103,8 @@ public class ServerNodeManager implements InitializingBean {
*/
private ScheduledExecutorService executorService;
/**
* zk client
*/
private RegistryClient registryClient = RegistryClient.getInstance();
@Autowired
private RegistryClient registryClient;
/**
* eg : /node/worker/group/127.0.0.1:xxx
@ -153,11 +153,11 @@ public class ServerNodeManager implements InitializingBean {
*/
executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
/**
/*
* init MasterNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
/**
/*
* init WorkerNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
@ -167,15 +167,15 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper
*/
public void load() {
/**
/*
* master nodes from zookeeper
*/
updateMasterNodes();
/**
/*
* worker group nodes from zookeeper
*/
Set<String> workerGroups = registryClient.getWorkerGroupDirectly();
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
}
@ -218,25 +218,28 @@ public class ServerNodeManager implements InitializingBean {
class WorkerDataListener implements SubscribeListener {
@Override
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
public void notify(Event event) {
final String path = event.path();
final Type type = event.type();
final String data = event.data();
if (registryClient.isWorkerPath(path)) {
try {
if (dataChangeEvent == DataChangeEvent.ADD) {
if (type == Type.ADD) {
logger.info("worker group node : {} added.", path);
String group = parseGroup(path);
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes);
} else if (dataChangeEvent == DataChangeEvent.REMOVE) {
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
String group = parseGroup(path);
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
} else if (dataChangeEvent == DataChangeEvent.UPDATE) {
} else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path);
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
String node = parseNode(path);
@ -268,19 +271,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
/**
* master node listener
*/
class MasterDataListener implements SubscribeListener {
@Override
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
public void notify(Event event) {
final String path = event.path();
final Type type = event.type();
if (registryClient.isMasterPath(path)) {
try {
if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
if (type.equals(Type.ADD)) {
logger.info("master node : {} added.", path);
updateMasterNodes();
}
if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) {
if (type.equals(Type.REMOVE)) {
logger.info("master node : {} down.", path);
updateMasterNodes();
alertDao.sendServerStopedAlert(1, path, "MASTER");
@ -295,10 +297,10 @@ public class ServerNodeManager implements InitializingBean {
private void updateMasterNodes() {
SLOT_LIST.clear();
this.masterNodes.clear();
String nodeLock = registryClient.getMasterLockPath();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try {
registryClient.getLock(nodeLock);
Set<String> currentNodes = registryClient.getMasterNodesDirectly();
Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodes);
} catch (Exception e) {
@ -328,7 +330,7 @@ public class ServerNodeManager implements InitializingBean {
*
* @param nodes master nodes
*/
private void syncMasterNodes(Set<String> nodes, List<Server> masterNodes) {
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
this.masterNodes.addAll(nodes);
@ -353,7 +355,7 @@ public class ServerNodeManager implements InitializingBean {
* @param workerGroup worker group
* @param nodes worker nodes
*/
private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
workerGroupLock.lock();
try {
workerGroup = workerGroup.toLowerCase();

View File

@ -19,36 +19,26 @@ package org.apache.dolphinscheduler.server.monitor;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* zk monitor server impl
*/
@Component
public class RegistryMonitorImpl extends AbstractMonitor {
/**
* zookeeper operator
*/
private RegistryClient registryClient = RegistryClient.getInstance();
@Autowired
private RegistryClient registryClient;
/**
* get active nodes map by path
*
* @param path path
* @return active nodes map
*/
@Override
protected Map<String, String> getActiveNodesByPath(String path) {
Map<String, String> maps = new HashMap<>();
List<String> childrenList = registryClient.getChildrenKeys(path);
Collection<String> childrenList = registryClient.getChildrenKeys(path);
if (childrenList == null) {
return maps;

View File

@ -33,11 +33,11 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private Set<String> heartBeatPaths;
private RegistryClient registryClient;
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private WorkerManagerThread workerManagerThread;
private String serverType;
private HeartBeat heartBeat;
private final String serverType;
private final HeartBeat heartBeat;
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
}
for (String heartBeatPath : heartBeatPaths) {
registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat());
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);

View File

@ -51,18 +51,12 @@ public class TaskCallbackService {
*/
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
* zookeeper registry center
*/
private RegistryClient registryClient;
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService() {
this.registryClient = RegistryClient.getInstance();
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());

View File

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
@ -73,6 +74,7 @@ public class WorkerRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;
@Autowired
private RegistryClient registryClient;
/**
@ -86,7 +88,6 @@ public class WorkerRegistryClient {
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
@ -121,7 +122,7 @@ public class WorkerRegistryClient {
/**
* remove registry info
*/
public void unRegistry() {
public void unRegistry() throws IOException {
try {
String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths();
@ -161,7 +162,7 @@ public class WorkerRegistryClient {
return workerPaths;
}
public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
registryClient.handleDeadServer(nodeSet, nodeType, opType);
}

View File

@ -93,11 +93,6 @@ dbname="dolphinscheduler"
# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# Registry Server plugin dir. DolphinScheduler will find and load the registry plugin jar package from this dir.
# For now default registry server is zookeeper, so the default value is `lib/plugin/registry/zookeeper`.
# If you want to implement your own registry server, please see https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/registry_spi.html
registryPluginDir="lib/plugin/registry/zookeeper"
# Registry Server plugin name, should be a substring of `registryPluginDir`, DolphinScheduler use this for verifying configuration consistency
registryPluginName="zookeeper"

View File

@ -56,7 +56,7 @@ public class ExecutorDispatcherTest {
}
@Test
public void testDispatch() throws ExecuteException {
public void testDispatch() throws Exception {
int port = 30000;
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(port);

View File

@ -20,10 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@ -33,13 +29,17 @@ import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecC
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@ -59,6 +59,7 @@ public class MasterRegistryClientTest {
@Mock
private MasterConfig masterConfig;
@Mock
private RegistryClient registryClient;
@Mock
@ -72,13 +73,10 @@ public class MasterRegistryClientTest {
@Before
public void before() throws Exception {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.getMasterFailoverLockPath()).willReturn("/path");
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString());
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ProcessInstance processInstance = new ProcessInstance();

View File

@ -42,6 +42,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>

View File

@ -1,243 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.spi.register.Registry;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryException;
import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.commons.lang.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
/**
* All business parties use this class to access the registry
*/
public class RegistryCenter {
private static final Logger logger = LoggerFactory.getLogger(RegistryCenter.class);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private Registry registry;
private IStoppable stoppable;
/**
* nodes namespace
*/
protected static String NODES;
private RegistryPluginManager registryPluginManager;
protected static final String EMPTY = "";
private static final String REGISTRY_PREFIX = "registry";
private static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
private static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
private static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
/**
* default registry plugin dir
*/
private static final String REGISTRY_PLUGIN_PATH = "lib/plugin/registry";
private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
/**
* init node persist
*/
public void init() {
if (isStarted.compareAndSet(false, true)) {
PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
if (null == registryConfig || registryConfig.isEmpty()) {
throw new RegistryException("registry config param is null");
}
if (null == registryPluginManager) {
installRegistryPlugin(registryConfig.get(REGISTRY_PLUGIN_NAME));
registry = registryPluginManager.getRegistry();
}
registry.init(registryConfig);
initNodes();
}
}
/**
* init nodes
*/
private void initNodes() {
persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
}
/**
* install registry plugin
*/
private void installRegistryPlugin(String registryPluginName) {
DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING));
if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) {
registryPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim());
}
if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) {
registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
}
registryPluginManager = new RegistryPluginManager(registryPluginName);
DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
try {
registryPluginLoader.loadPlugins();
} catch (Exception e) {
throw new RuntimeException("Load registry Plugin Failed !", e);
}
}
/**
* close
*/
public void close() {
if (isStarted.compareAndSet(true, false) && registry != null) {
registry.close();
}
}
public void persist(String key, String value) {
registry.persist(key, value);
}
public void persistEphemeral(String key, String value) {
registry.persistEphemeral(key, value);
}
public void remove(String key) {
registry.remove(key);
}
public void update(String key, String value) {
registry.update(key, value);
}
public String get(String key) {
return registry.get(key);
}
public void subscribe(String path, SubscribeListener subscribeListener) {
registry.subscribe(path, subscribeListener);
}
public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
registry.addConnectionStateListener(registryConnectListener);
}
public boolean isExisted(String key) {
return registry.isExisted(key);
}
public boolean getLock(String key) {
return registry.acquireLock(key);
}
public boolean releaseLock(String key) {
return registry.releaseLock(key);
}
/**
* @return get dead server node parent path
*/
public String getDeadZNodeParentPath() {
return REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
}
public void setStoppable(IStoppable stoppable) {
this.stoppable = stoppable;
}
public IStoppable getStoppable() {
return stoppable;
}
/**
* whether master path
*
* @param path path
* @return result
*/
public boolean isMasterPath(String path) {
return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
/**
* get worker group path
*
* @param workerGroup workerGroup
* @return worker group path
*/
public String getWorkerGroupPath(String workerGroup) {
return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
}
/**
* whether worker path
*
* @param path path
* @return result
*/
public boolean isWorkerPath(String path) {
return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
/**
* get children nodes
*
* @param key key
* @return children nodes
*/
public List<String> getChildrenKeys(final String key) {
return registry.getChildren(key);
}
}

View File

@ -22,60 +22,72 @@ import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.RegistryFactory;
import org.apache.dolphinscheduler.registry.api.RegistryFactoryLoader;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* registry client singleton
*/
public class RegistryClient extends RegistryCenter {
import com.google.common.base.Strings;
@Component
public class RegistryClient {
private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
private static RegistryClient registryClient = new RegistryClient();
private static final String EMPTY = "";
private static final String REGISTRY_PREFIX = "registry";
private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private Registry registry;
private IStoppable stoppable;
private RegistryClient() {
super.init();
@PostConstruct
public void afterConstruct() {
start();
initNodes();
}
public static RegistryClient getInstance() {
return registryClient;
}
/**
* get active master num
*
* @return active master number
*/
public int getActiveMasterNum() {
List<String> childrenList = new ArrayList<>();
Collection<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if (isExisted(getNodeParentPath(NodeType.MASTER))) {
childrenList = getChildrenKeys(getNodeParentPath(NodeType.MASTER));
if (exists(rootNodePath(NodeType.MASTER))) {
childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error", e);
@ -83,15 +95,9 @@ public class RegistryClient extends RegistryCenter {
return childrenList.size();
}
/**
* get server list.
*
* @param nodeType zookeeper node type
* @return server list
*/
public List<Server> getServerList(NodeType nodeType) {
Map<String, String> serverMaps = getServerMaps(nodeType);
String parentPath = getNodeParentPath(nodeType);
Map<String, String> serverMaps = getServerMaps(nodeType, false);
String parentPath = rootNodePath(nodeType);
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
@ -119,40 +125,11 @@ public class RegistryClient extends RegistryCenter {
return serverList;
}
/**
* get server nodes.
*
* @param nodeType registry node type
* @return result : list<node>
*/
public List<String> getServerNodes(NodeType nodeType) {
String path = getNodeParentPath(nodeType);
List<String> serverList = getChildrenKeys(path);
if (nodeType == NodeType.WORKER) {
List<String> workerList = new ArrayList<>();
for (String group : serverList) {
List<String> groupServers = getChildrenKeys(path + SINGLE_SLASH + group);
for (String groupServer : groupServers) {
workerList.add(group + SINGLE_SLASH + groupServer);
}
}
serverList = workerList;
}
return serverList;
}
/**
* get server list map.
*
* @param nodeType zookeeper node type
* @param hostOnly host only
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
Map<String, String> serverMap = new HashMap<>();
try {
String path = getNodeParentPath(nodeType);
List<String> serverList = getServerNodes(nodeType);
String path = rootNodePath(nodeType);
Collection<String> serverList = getServerNodes(nodeType);
for (String server : serverList) {
String host = server;
if (nodeType == NodeType.WORKER && hostOnly) {
@ -167,298 +144,194 @@ public class RegistryClient extends RegistryCenter {
return serverMap;
}
/**
* get server list map.
*
* @param nodeType zookeeper node type
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(NodeType nodeType) {
return getServerMaps(nodeType, false);
}
/**
* get server node set.
*
* @param nodeType zookeeper node type
* @param hostOnly host only
* @return result : set<host>
*/
public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
Set<String> serverSet = new HashSet<>();
try {
List<String> serverList = getServerNodes(nodeType);
for (String server : serverList) {
String host = server;
if (nodeType == NodeType.WORKER && hostOnly) {
host = server.split(SINGLE_SLASH)[1];
}
serverSet.add(host);
}
} catch (Exception e) {
logger.error("get server node set failed", e);
}
return serverSet;
}
/**
* get server node list.
*
* @param nodeType zookeeper node type
* @param hostOnly host only
* @return result : list<host>
*/
public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
Set<String> serverSet = getServerNodeSet(nodeType, hostOnly);
List<String> serverList = new ArrayList<>(serverSet);
Collections.sort(serverList);
return serverList;
}
/**
* check the zookeeper node already exists
*
* @param host host
* @param nodeType zookeeper node type
* @return true if exists
*/
public boolean checkNodeExists(String host, NodeType nodeType) {
String path = getNodeParentPath(nodeType);
if (StringUtils.isEmpty(path)) {
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, nodeType);
return false;
}
Map<String, String> serverMaps = getServerMaps(nodeType, true);
for (String hostKey : serverMaps.keySet()) {
if (hostKey.contains(host)) {
return true;
}
}
return false;
return getServerMaps(nodeType, true).keySet()
.stream()
.anyMatch(it -> it.contains(host));
}
/**
* @return get worker node parent path
*/
protected String getWorkerNodeParentPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
}
public void handleDeadServer(Collection<String> nodes, NodeType nodeType, String opType) {
nodes.forEach(node -> {
final String host = getHostByEventDataPath(node);
final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE;
/**
* @return get master node parent path
*/
protected String getMasterNodeParentPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
}
/**
* @return get dead server node parent path
*/
protected String getDeadNodeParentPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
* @return get master lock path
*/
public String getMasterLockPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
* @param nodeType zookeeper node type
* @return get zookeeper node parent path
*/
public String getNodeParentPath(NodeType nodeType) {
String path = "";
switch (nodeType) {
case MASTER:
return getMasterNodeParentPath();
case WORKER:
return getWorkerNodeParentPath();
case DEAD_SERVER:
return getDeadNodeParentPath();
default:
break;
}
return path;
}
/**
* @return get master start up lock path
*/
public String getMasterStartUpLockPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
}
/**
* @return get master failover lock path
*/
public String getMasterFailoverLockPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
* @return get worker failover lock path
*/
public String getWorkerFailoverLockPath() {
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param node node path
* @param nodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String node, NodeType nodeType, String opType) throws Exception {
String host = getHostByEventDataPath(node);
String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
nodeType, node);
}
}
}
/**
* check dead server or not , if dead, stop self
*
* @param node node path
* @param serverType master or worker prefix
* @return true if not exists
* @throws Exception errors
*/
public boolean checkIsDeadServer(String node, String serverType) throws Exception {
// ip_sequence_no
String[] zNodesPath = node.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
return !isExisted(node) || isExisted(deadServerPath);
}
/**
* get master nodes directly
*
* @return master nodes
*/
public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
return new HashSet<>(masters);
}
/**
* get worker nodes directly
*
* @return master nodes
*/
public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers);
}
/**
* get worker group directly
*
* @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers);
}
/**
* get worker group nodes
*/
public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
List<String> workers = getChildrenKeys(getWorkerGroupPath(workerGroup));
return new HashSet<>(workers);
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param nodeSet node path set
* @param nodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
for (String node : nodeSet) {
String host = getHostByEventDataPath(node);
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to registry dead server path success",
nodeType, node);
}
String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host;
// Add dead server info to zk dead server path : /dead-servers/
registry.put(deadServerPath, type + UNDERLINE + host, false);
logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node);
}
});
}
}
public boolean checkIsDeadServer(String node, String serverType) {
// ip_sequence_no
String[] zNodesPath = node.split("/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
return !exists(node) || exists(deadServerPath);
}
public Collection<String> getMasterNodesDirectly() {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
public Collection<String> getWorkerGroupDirectly() {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
public Collection<String> getWorkerGroupNodesDirectly(String workerGroup) {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
}
/**
* get host ip:port, string format: parentPath/ip:port
* get host ip:port, path format: parentPath/ip:port
*
* @param path path
* @return host ip:port, string format: parentPath/ip:port
*/
public String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
return pathArray[pathArray.length - 1];
checkArgument(!Strings.isNullOrEmpty(path), "path cannot be null or empty");
final String[] pathArray = path.split(SINGLE_SLASH);
checkArgument(pathArray.length >= 1, "cannot parse path: %s", path);
return pathArray[pathArray.length - 1];
}
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
*/
public void removeDeadServerByHost(String host, String serverType) {
List<String> deadServers = getChildrenKeys(getDeadZNodeParentPath());
public void close() throws IOException {
if (isStarted.compareAndSet(true, false) && registry != null) {
registry.close();
}
}
public void persistEphemeral(String key, String value) {
registry.put(key, value, true);
}
public void remove(String key) {
registry.delete(key);
}
public String get(String key) {
return registry.get(key);
}
public void subscribe(String path, SubscribeListener listener) {
registry.subscribe(path, listener);
}
public void addConnectionStateListener(ConnectionListener listener) {
registry.addConnectionStateListener(listener);
}
public boolean exists(String key) {
return registry.exists(key);
}
public boolean getLock(String key) {
return registry.acquireLock(key);
}
public boolean releaseLock(String key) {
return registry.releaseLock(key);
}
public void setStoppable(IStoppable stoppable) {
this.stoppable = stoppable;
}
public IStoppable getStoppable() {
return stoppable;
}
public boolean isMasterPath(String path) {
return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
public boolean isWorkerPath(String path) {
return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
public Collection<String> getChildrenKeys(final String key) {
return registry.children(key);
}
public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
try {
return getServerNodes(nodeType).stream().map(server -> {
if (nodeType == NodeType.WORKER && hostOnly) {
return server.split(SINGLE_SLASH)[1];
}
return server;
}).collect(Collectors.toSet());
} catch (Exception e) {
throw new RegistryException("Failed to get server node: " + nodeType, e);
}
}
private void start() {
if (isStarted.compareAndSet(false, true)) {
PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
final Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
if (null == registryConfig || registryConfig.isEmpty()) {
throw new RegistryException("registry config param is null");
}
final String pluginName = registryConfig.get(REGISTRY_PLUGIN_NAME);
final Map<String, RegistryFactory> factories = RegistryFactoryLoader.load();
if (!factories.containsKey(pluginName)) {
throw new RegistryException("No such registry plugin: " + pluginName);
}
registry = factories.get(pluginName).create();
registry.start(registryConfig);
}
}
private void initNodes() {
registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false);
}
private String rootNodePath(NodeType type) {
switch (type) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
case DEAD_SERVER:
return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
default:
throw new IllegalStateException("Should not reach here");
}
}
private Collection<String> getServerNodes(NodeType nodeType) {
final String path = rootNodePath(nodeType);
final Collection<String> serverList = getChildrenKeys(path);
if (nodeType != NodeType.WORKER) {
return serverList;
}
return serverList.stream().flatMap(group ->
getChildrenKeys(path + SINGLE_SLASH + group)
.stream()
.map(it -> group + SINGLE_SLASH + it)
).collect(Collectors.toList());
}
private void removeDeadServerByHost(String host, String serverType) {
Collection<String> deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS);
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
}

View File

@ -15,18 +15,5 @@
# limitations under the License.
#
#registry.plugin.dir config the Registry Plugin dir.
registry.plugin.dir=lib/plugin/registry
registry.plugin.name=zookeeper
registry.servers=127.0.0.1:2181
#maven.local.repository=/usr/local/localRepository
#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
#registry timeout
#registry.session.timeout.ms=30000
#registry.connection.timeout.ms=7500
#registry.block.until.connected.wait=600

View File

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.registry;
import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.spi.register.Registry;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
public class RegistryClientTest {
private RegistryClient registryClient;
@Test
public void test() throws Exception {
Registry registry = PowerMockito.mock(Registry.class);
PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true);
PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089"));
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
registryClient.persist("/key", "");
registryClient.update("/key", "");
registryClient.releaseLock("/key");
registryClient.getChildrenKeys("/key");
registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, DELETE_OP);
registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, ADD_OP);
//registryClient.removeDeadServerByHost("127.0.0.1:8089","master");
registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, DELETE_OP);
registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, ADD_OP);
registryClient.checkIsDeadServer("master/127.0.0.1","master");
given(registry.getChildren("/nodes/worker")).willReturn(Arrays.asList("worker_127.0.0.1:8089"));
given(registry.getChildren("/nodes/worker/worker_127.0.0.1:8089")).willReturn(Arrays.asList("default"));
registryClient.checkNodeExists("127.0.0.1",NodeType.WORKER);
registryClient.getServerList(NodeType.MASTER);
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.registry;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
public class RegistryPluginTest {
@Test
public void testLoadPlugin() throws Exception {
DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
String path = DolphinPluginLoader.class.getClassLoader().getResource("").getPath();
String registryPluginZkPath = path + "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml";
registryPluginManagerConfig.setPlugins(registryPluginZkPath);
RegistryPluginManager registryPluginManager = new RegistryPluginManager("zookeeper");
DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
registryPluginLoader.loadPlugins();
Assert.assertNotNull(registryPluginManager.getRegistry());
}
}

View File

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.spi;
import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.register.RegistryFactory;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
/**
@ -42,14 +41,6 @@ public interface DolphinSchedulerPlugin {
return emptyList();
}
/**
* get registry plugin factory
* @return registry factory
*/
default Iterable<RegistryFactory> getRegisterFactorys() {
return emptyList();
}
/**
* get task plugin factory
* @return registry factory

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
public interface ConnectStateListener {
void notify(RegistryConnectState state);
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
/**
* Monitor the type of data changes
*/
public enum DataChangeEvent {
ADD("ADD", 1),
REMOVE("REMOVE", 2),
UPDATE("UPDATE",3);
private String type;
private int value;
DataChangeEvent(String type, int value) {
this.type = type;
this.value = value;
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
import java.util.HashMap;
/**
* The registry node monitors subscriptions
*/
public class ListenerManager {
/**
* All message subscriptions must be subscribed uniformly at startup.
* A node path only supports one listener
*/
private static HashMap<String, SubscribeListener> listeners = new HashMap<>();
/**
* Check whether the key has been monitored
*/
public static boolean checkHasListeners(String path) {
return null != listeners.get(path);
}
/**
* add listener(A node can only be monitored by one listener)
*/
public static void addListener(String path, SubscribeListener listener) {
listeners.put(path, listener);
}
/**
* remove listener
*/
public static void removeListener(String path) {
listeners.remove(path);
}
/**
*
*After the data changes, it is distributed to the corresponding listener for processing
*/
public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) {
SubscribeListener notifyListener = listeners.get(key);
if (null == notifyListener) {
return;
}
notifyListener.notify(path, data, dataChangeEvent);
}
}

View File

@ -1,102 +0,0 @@
package org.apache.dolphinscheduler.spi.register;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.List;
import java.util.Map;
/**
* The final display of all registry component data must follow a tree structure.
* Therefore, some registry may need to do a layer of internal conversion, such as Etcd
*/
public interface Registry {
/**
* initialize registry center.
*/
void init(Map<String, String> registerData);
/**
* close registry
*/
void close();
/**
* subscribe registry data change, a path can only be monitored by one listener
*/
boolean subscribe(String path, SubscribeListener subscribeListener);
/**
* unsubscribe
*/
void unsubscribe(String path);
/**
* Registry status monitoring, globally unique. Only one is allowed to subscribe.
*/
void addConnectionStateListener(RegistryConnectListener registryConnectListener);
/**
* get key
*/
String get(String key);
/**
* delete
*/
void remove(String key);
/**
* persist data
*/
void persist(String key, String value);
/**
*persist ephemeral data
*/
void persistEphemeral(String key, String value);
/**
* update data
*/
void update(String key, String value);
/**
* get children keys
*/
List<String> getChildren(String path);
/**
* Judge node is exist or not.
*/
boolean isExisted(String key);
/**
* delete kay
*/
boolean delete(String key);
/**
* Obtain a distributed lock
* todo It is best to add expiration time, and automatically release the lock after expiration
*/
boolean acquireLock(String key);
/**
* release key
*/
boolean releaseLock(String key);
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
public interface RegistryConnectListener {
void notify(RegistryConnectState newState);
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
/**
* All registry connection status must be converted to this
*/
public enum RegistryConnectState {
CONNECTED("connected", 1),
RECONNECTED("reconnected", 2),
SUSPENDED("suspended", 3),
LOST("lost", 4);
private String description;
private int state;
RegistryConnectState(String description, int state) {
this.description = description;
this.state = state;
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
/**
* registry exception
*/
public class RegistryException extends RuntimeException {
public RegistryException(String message, Throwable cause) {
super(message, cause);
}
public RegistryException(String message) {
super(message);
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
/**
* Registry the component factory, all registry must implement this interface
*/
public interface RegistryFactory {
/**
* get registry component name
*/
String getName();
/**
* get registry
*/
Registry create();
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.classloader.ThreadContextClassLoader;
import org.apache.dolphinscheduler.spi.plugin.AbstractDolphinPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The plug-in address of the registry needs to be configured.
* Multi-registries are not supported.
* When the plug-in directory contains multiple plug-ins, only the configured plug-in will be used.
* todo Its not good to put it here, consider creating a separate API module for each plugin
*/
public class RegistryPluginManager extends AbstractDolphinPluginManager {
private static final Logger logger = LoggerFactory.getLogger(RegistryPluginManager.class);
private RegistryFactory registryFactory;
public static Registry registry;
private String registerPluginName;
public RegistryPluginManager(String registerPluginName) {
this.registerPluginName = registerPluginName;
}
@Override
public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) {
for (RegistryFactory registryFactory : dolphinSchedulerPlugin.getRegisterFactorys()) {
logger.info("Registering Registry Plugin '{}'", registryFactory.getName());
if (registerPluginName.equals(registryFactory.getName())) {
this.registryFactory = registryFactory;
loadRegistry();
return;
}
}
if (null == registry) {
throw new RegistryException(String.format("not found %s registry plugin ", registerPluginName));
}
}
/**
* load registry
*/
private void loadRegistry() {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(registryFactory.getClass().getClassLoader())) {
registry = registryFactory.create();
}
}
/**
* get registry
* @return registry
*/
public Registry getRegistry() {
if (null == registry) {
throw new RegistryException("not install registry");
}
return registry;
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.register;
/**
* Registration center subscription. All listeners must implement this interface
*/
public interface SubscribeListener {
/**
* Processing logic when the subscription node changes
*/
void notify(String path, String data, DataChangeEvent dataChangeEvent);
}

View File

@ -86,15 +86,6 @@ public class StandaloneServer {
private static void startRegistry() throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.servers", server.getConnectString());
final Path registryPath = Paths.get(
StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"
).toAbsolutePath();
if (Files.exists(registryPath)) {
System.setProperty("registry.plugin.binding", registryPath.toString());
System.setProperty("registry.plugin.dir", "");
}
}
private static void startDatabase() throws IOException, SQLException {

23
pom.xml
View File

@ -128,6 +128,7 @@
<byte-buddy.version>1.9.16</byte-buddy.version>
<java-websocket.version>1.5.1</java-websocket.version>
<py4j.version>0.10.9</py4j.version>
<auto-service.version>1.0.1</auto-service.version>
</properties>
<dependencyManagement>
@ -231,7 +232,12 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-plugin</artifactId>
<artifactId>dolphinscheduler-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -270,6 +276,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
@ -690,6 +702,13 @@
<artifactId>py4j</artifactId>
<version>${py4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>${auto-service.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</dependencyManagement>
@ -1234,7 +1253,7 @@
<modules>
<module>dolphinscheduler-spi</module>
<module>dolphinscheduler-alert-plugin</module>
<module>dolphinscheduler-registry-plugin</module>
<module>dolphinscheduler-registry</module>
<module>dolphinscheduler-task-plugin</module>
<module>dolphinscheduler-ui</module>
<module>dolphinscheduler-server</module>

View File

@ -20,7 +20,7 @@ export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax