| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.zookeeper.util; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.StringUtils; |
| import org.apache.dubbo.registry.client.DefaultServiceInstance; |
| import org.apache.dubbo.registry.client.ServiceInstance; |
| import org.apache.dubbo.registry.zookeeper.ZookeeperInstance; |
| import org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery; |
| import org.apache.dubbo.rpc.model.ScopeModelUtil; |
| |
| import org.apache.curator.RetryPolicy; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.framework.api.ACLProvider; |
| import org.apache.curator.framework.imps.CuratorFrameworkState; |
| import org.apache.curator.framework.state.ConnectionState; |
| import org.apache.curator.framework.state.ConnectionStateListener; |
| import org.apache.curator.retry.ExponentialBackoffRetry; |
| import org.apache.curator.x.discovery.ServiceDiscovery; |
| import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; |
| import org.apache.curator.x.discovery.ServiceInstanceBuilder; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.data.ACL; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.curator.x.discovery.ServiceInstance.builder; |
| import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR; |
| import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; |
| import static org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BASE_SLEEP_TIME; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_UNIT; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_WAIT; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.GROUP_PATH; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_RETRIES; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_SLEEP; |
| import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH; |
| |
| /** |
| * Curator Framework Utilities Class |
| * |
| * @since 2.7.5 |
| */ |
| public abstract class CuratorFrameworkUtils { |
| |
| public static ServiceDiscovery<ZookeeperInstance> buildServiceDiscovery(CuratorFramework curatorFramework, |
| String basePath) { |
| return ServiceDiscoveryBuilder.builder(ZookeeperInstance.class) |
| .client(curatorFramework) |
| .basePath(basePath) |
| .build(); |
| } |
| |
| public static CuratorFramework buildCuratorFramework(URL connectionURL, ZookeeperServiceDiscovery serviceDiscovery) throws Exception { |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() |
| .connectString(connectionURL.getBackupAddress()) |
| .retryPolicy(buildRetryPolicy(connectionURL)); |
| String userInformation = connectionURL.getUserInformation(); |
| if (StringUtils.isNotEmpty(userInformation)) { |
| builder = builder.authorization("digest", userInformation.getBytes()); |
| builder.aclProvider(new ACLProvider() { |
| @Override |
| public List<ACL> getDefaultAcl() { |
| return ZooDefs.Ids.CREATOR_ALL_ACL; |
| } |
| |
| @Override |
| public List<ACL> getAclForPath(String path) { |
| return ZooDefs.Ids.CREATOR_ALL_ACL; |
| } |
| }); |
| } |
| CuratorFramework curatorFramework = builder.build(); |
| |
| curatorFramework.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(connectionURL, serviceDiscovery)); |
| |
| curatorFramework.start(); |
| curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL), |
| BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL)); |
| |
| if (!curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) { |
| throw new IllegalStateException("zookeeper client initialization failed"); |
| } |
| |
| if (!curatorFramework.getZookeeperClient().isConnected()) { |
| throw new IllegalStateException("failed to connect to zookeeper server"); |
| } |
| |
| return curatorFramework; |
| } |
| |
| public static RetryPolicy buildRetryPolicy(URL connectionURL) { |
| int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(connectionURL); |
| int maxRetries = MAX_RETRIES.getParameterValue(connectionURL); |
| int getMaxSleepMs = MAX_SLEEP.getParameterValue(connectionURL); |
| return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, getMaxSleepMs); |
| } |
| |
| |
| public static List<ServiceInstance> build(URL registryUrl, Collection<org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance>> instances) { |
| return instances.stream().map((i)->build(registryUrl, i)).collect(Collectors.toList()); |
| } |
| |
| public static ServiceInstance build(URL registryUrl, org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> instance) { |
| String name = instance.getName(); |
| String host = instance.getAddress(); |
| int port = instance.getPort(); |
| ZookeeperInstance zookeeperInstance = instance.getPayload(); |
| DefaultServiceInstance serviceInstance = new DefaultServiceInstance(name, host, port, ScopeModelUtil.getApplicationModel(registryUrl.getScopeModel())); |
| serviceInstance.setMetadata(zookeeperInstance.getMetadata()); |
| return serviceInstance; |
| } |
| |
| public static org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build(ServiceInstance serviceInstance) { |
| ServiceInstanceBuilder builder; |
| String serviceName = serviceInstance.getServiceName(); |
| String host = serviceInstance.getHost(); |
| int port = serviceInstance.getPort(); |
| Map<String, String> metadata = serviceInstance.getSortedMetadata(); |
| String id = generateId(host, port); |
| ZookeeperInstance zookeeperInstance = new ZookeeperInstance(id, serviceName, metadata); |
| try { |
| builder = builder() |
| .id(id) |
| .name(serviceName) |
| .address(host) |
| .port(port) |
| .payload(zookeeperInstance); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| return builder.build(); |
| } |
| |
| public static String generateId(String host, int port) { |
| return host + ":" + port; |
| } |
| |
| public static String getRootPath(URL registryURL) { |
| String group = ROOT_PATH.getParameterValue(registryURL); |
| if (group.equalsIgnoreCase(DEFAULT_GROUP)) { |
| group = GROUP_PATH.getParameterValue(registryURL); |
| if (!group.startsWith(PATH_SEPARATOR)) { |
| group = PATH_SEPARATOR + group; |
| } |
| } |
| return group; |
| } |
| |
| private static class CuratorConnectionStateListener implements ConnectionStateListener { |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(CuratorConnectionStateListener.class); |
| private final long UNKNOWN_SESSION_ID = -1L; |
| protected final int DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000; |
| protected final int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000; |
| |
| private long lastSessionId; |
| private final int timeout; |
| private final int sessionExpireMs; |
| private final ZookeeperServiceDiscovery serviceDiscovery; |
| |
| public CuratorConnectionStateListener(URL url, ZookeeperServiceDiscovery serviceDiscovery) { |
| this.timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS); |
| this.sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS); |
| this.serviceDiscovery = serviceDiscovery; |
| } |
| |
| @Override |
| public void stateChanged(CuratorFramework client, ConnectionState state) { |
| long sessionId = UNKNOWN_SESSION_ID; |
| try { |
| sessionId = client.getZookeeperClient().getZooKeeper().getSessionId(); |
| } catch (Exception e) { |
| logger.warn("Curator client state changed, but failed to get the related zk session instance."); |
| } |
| |
| if (state == ConnectionState.LOST) { |
| logger.warn("Curator zookeeper session " + Long.toHexString(lastSessionId) + " expired."); |
| } else if (state == ConnectionState.SUSPENDED) { |
| logger.warn("Curator zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. " + |
| "connection timeout value is " + timeout + ", session expire timeout value is " + sessionExpireMs); |
| } else if (state == ConnectionState.CONNECTED) { |
| lastSessionId = sessionId; |
| logger.info("Curator zookeeper client instance initiated successfully, session id is " + Long.toHexString(sessionId)); |
| } else if (state == ConnectionState.RECONNECTED) { |
| if (lastSessionId == sessionId && sessionId != UNKNOWN_SESSION_ID) { |
| logger.warn("Curator zookeeper connection recovered from connection lose, " + |
| "reuse the old session " + Long.toHexString(sessionId)); |
| serviceDiscovery.recover(); |
| } else { |
| logger.warn("New session created after old session lost, " + |
| "old session " + Long.toHexString(lastSessionId) + ", new session " + Long.toHexString(sessionId)); |
| lastSessionId = sessionId; |
| serviceDiscovery.recover(); |
| } |
| } |
| } |
| |
| } |
| } |