| package org.apache.helix.rest.server; |
| |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.SystemPropertyKeys; |
| import org.apache.helix.manager.zk.ZKHelixAdmin; |
| import org.apache.helix.manager.zk.ZKHelixDataAccessor; |
| import org.apache.helix.manager.zk.ZkBaseDataAccessor; |
| import org.apache.helix.manager.zk.ZkBucketDataAccessor; |
| import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; |
| import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; |
| import org.apache.helix.task.TaskDriver; |
| import org.apache.helix.tools.ClusterSetup; |
| import org.apache.helix.zookeeper.api.client.HelixZkClient; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.constant.RoutingDataReaderType; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer; |
| import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; |
| import org.apache.helix.zookeeper.impl.client.FederatedZkClient; |
| import org.apache.helix.zookeeper.impl.client.ZkClient; |
| import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; |
| import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; |
| import org.apache.helix.zookeeper.routing.RoutingDataManager; |
| import org.apache.helix.zookeeper.zkclient.IZkChildListener; |
| import org.apache.helix.zookeeper.zkclient.IZkDataListener; |
| import org.apache.helix.zookeeper.zkclient.IZkStateListener; |
| import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; |
| import org.apache.zookeeper.Watcher; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| public class ServerContext implements IZkDataListener, IZkChildListener, IZkStateListener { |
| private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class); |
| |
| private final String _zkAddr; |
| private final String _msdsEndpoint; |
| private final boolean _isMultiZkEnabled; |
| private volatile RealmAwareZkClient _zkClient; |
| private volatile RealmAwareZkClient _byteArrayZkClient; |
| |
| private volatile ZKHelixAdmin _zkHelixAdmin; |
| private volatile ClusterSetup _clusterSetup; |
| private volatile ConfigAccessor _configAccessor; |
| // A lazily-initialized base data accessor that reads/writes byte array to ZK |
| // TODO: Only read (deserialize) is supported at this time. This baseDataAccessor should support write (serialize) as needs arise |
| private volatile ZkBaseDataAccessor<byte[]> _byteArrayZkBaseDataAccessor; |
| // 1 Cluster name will correspond to 1 helix data accessor |
| private final Map<String, HelixDataAccessor> _helixDataAccessorPool; |
| // 1 Cluster name will correspond to 1 task driver |
| private final Map<String, TaskDriver> _taskDriverPool; |
| // Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer. |
| private volatile ZkBucketDataAccessor _zkBucketDataAccessor; |
| |
| /** |
| * Multi-ZK support |
| */ |
| private final ZkMetadataStoreDirectory _zkMetadataStoreDirectory; |
| // Create a dedicated ZkClient for listening to data changes in routing data |
| private RealmAwareZkClient _zkClientForRoutingDataListener; |
| |
| public ServerContext(String zkAddr) { |
| this(zkAddr, false, null); |
| } |
| |
| /** |
| * Initializes a ServerContext for this namespace. |
| * @param zkAddr routing ZK address (on multi-zk mode) |
| * @param isMultiZkEnabled boolean flag for whether multi-zk mode is enabled |
| * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS. |
| */ |
| public ServerContext(String zkAddr, boolean isMultiZkEnabled, String msdsEndpoint) { |
| _zkAddr = zkAddr; |
| _isMultiZkEnabled = isMultiZkEnabled; |
| _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode |
| |
| // We should NOT initiate _zkClient and anything that depends on _zkClient in |
| // constructor, as it is reasonable to start up HelixRestServer first and then |
| // ZooKeeper. In this case, initializing _zkClient will fail and HelixRestServer |
| // cannot be started correctly. |
| _helixDataAccessorPool = new ConcurrentHashMap<>(); |
| _taskDriverPool = new ConcurrentHashMap<>(); |
| |
| // Initialize the singleton ZkMetadataStoreDirectory instance to allow it to be closed later |
| _zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance(); |
| } |
| |
| /** |
| * Lazy initialization of RealmAwareZkClient used throughout the REST server. |
| * @return |
| */ |
| public RealmAwareZkClient getRealmAwareZkClient() { |
| if (_zkClient == null) { |
| synchronized (this) { |
| if (_zkClient == null) { |
| _zkClient = createRealmAwareZkClient(new ZNRecordSerializer()); |
| } |
| } |
| } |
| return _zkClient; |
| } |
| |
| /** |
| * Returns a RealmAWareZkClient with ByteArraySerializer with double-checked locking. |
| * @return |
| */ |
| public RealmAwareZkClient getByteArrayRealmAwareZkClient() { |
| if (_byteArrayZkClient == null) { |
| synchronized (this) { |
| if (_byteArrayZkClient == null) { |
| _byteArrayZkClient = createRealmAwareZkClient(new ByteArraySerializer()); |
| } |
| } |
| } |
| return _byteArrayZkClient; |
| } |
| |
| /** |
| * Main creation logic for RealmAwareZkClient. |
| * @param zkSerializer the type of ZkSerializer to use |
| * @return |
| */ |
| private RealmAwareZkClient createRealmAwareZkClient(ZkSerializer zkSerializer) { |
| // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode |
| RealmAwareZkClient realmAwareZkClient; |
| if (_isMultiZkEnabled || Boolean |
| .parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) { |
| try { |
| initializeZkClientForRoutingData(); |
| RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = |
| new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); |
| // If MSDS endpoint is set for this namespace, use that instead. |
| if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) { |
| connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint) |
| .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()); |
| } |
| realmAwareZkClient = new FederatedZkClient(connectionConfigBuilder.build(), |
| new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer)); |
| LOG.info("ServerContext: FederatedZkClient created successfully!"); |
| } catch (InvalidRoutingDataException | IllegalStateException e) { |
| throw new HelixException("Failed to create FederatedZkClient!", e); |
| } |
| } else { |
| // If multi ZK config is not set, just connect to the ZK address given |
| HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); |
| clientConfig.setZkSerializer(zkSerializer); |
| realmAwareZkClient = SharedZkClientFactory.getInstance() |
| .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig); |
| } |
| return realmAwareZkClient; |
| } |
| |
| /** |
| * Initialization logic for ZkClient for routing data listener. |
| * NOTE: The initialization lifecycle of zkClientForRoutingDataListener is tied to the private |
| * volatile zkClient. |
| */ |
| private void initializeZkClientForRoutingData() { |
| // Make sure the ServerContext is subscribed to routing data change so that it knows |
| // when to reset ZkClients and Helix APIs |
| if (_zkClientForRoutingDataListener == null) { |
| // Routing data is always in the ZNRecord format |
| _zkClientForRoutingDataListener = DedicatedZkClientFactory.getInstance() |
| .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), |
| new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); |
| } |
| // Refresh data subscription |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!", _zkAddr); |
| } |
| |
| @Deprecated |
| public ZkClient getZkClient() { |
| return (ZkClient) getRealmAwareZkClient(); |
| } |
| |
| public HelixAdmin getHelixAdmin() { |
| if (_zkHelixAdmin == null) { |
| synchronized (this) { |
| if (_zkHelixAdmin == null) { |
| _zkHelixAdmin = new ZKHelixAdmin(getRealmAwareZkClient()); |
| } |
| } |
| } |
| return _zkHelixAdmin; |
| } |
| |
| public ClusterSetup getClusterSetup() { |
| if (_clusterSetup == null) { |
| synchronized (this) { |
| if (_clusterSetup == null) { |
| _clusterSetup = new ClusterSetup(getRealmAwareZkClient(), getHelixAdmin()); |
| } |
| } |
| } |
| return _clusterSetup; |
| } |
| |
| public TaskDriver getTaskDriver(String clusterName) { |
| TaskDriver taskDriver = _taskDriverPool.get(clusterName); |
| if (taskDriver == null) { |
| synchronized (this) { |
| if (!_taskDriverPool.containsKey(clusterName)) { |
| _taskDriverPool.put(clusterName, new TaskDriver(getRealmAwareZkClient(), clusterName)); |
| } |
| taskDriver = _taskDriverPool.get(clusterName); |
| } |
| } |
| return taskDriver; |
| } |
| |
| public ConfigAccessor getConfigAccessor() { |
| if (_configAccessor == null) { |
| synchronized (this) { |
| if (_configAccessor == null) { |
| _configAccessor = new ConfigAccessor(getRealmAwareZkClient()); |
| } |
| } |
| } |
| return _configAccessor; |
| } |
| |
| public HelixDataAccessor getDataAccessor(String clusterName) { |
| HelixDataAccessor dataAccessor = _helixDataAccessorPool.get(clusterName); |
| if (dataAccessor == null) { |
| synchronized (this) { |
| if (!_helixDataAccessorPool.containsKey(clusterName)) { |
| ZkBaseDataAccessor<ZNRecord> baseDataAccessor = |
| new ZkBaseDataAccessor<>(getRealmAwareZkClient()); |
| _helixDataAccessorPool.put(clusterName, |
| new ZKHelixDataAccessor(clusterName, baseDataAccessor)); |
| } |
| dataAccessor = _helixDataAccessorPool.get(clusterName); |
| } |
| } |
| return dataAccessor; |
| } |
| |
| /** |
| * Returns a lazily-instantiated ZkBaseDataAccessor for the byte array type. |
| * @return |
| */ |
| public BaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() { |
| if (_byteArrayZkBaseDataAccessor == null) { |
| synchronized (this) { |
| if (_byteArrayZkBaseDataAccessor == null) { |
| _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(getByteArrayRealmAwareZkClient()); |
| } |
| } |
| } |
| return _byteArrayZkBaseDataAccessor; |
| } |
| |
| public ZkBucketDataAccessor getZkBucketDataAccessor() { |
| // ZkBucketDataAccessor constructor will handle realmZK case (when _zkAddr is null) |
| if (_zkBucketDataAccessor == null) { |
| synchronized (this) { |
| if (_zkBucketDataAccessor == null) { |
| _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr); |
| } |
| } |
| } |
| return _zkBucketDataAccessor; |
| } |
| |
| public void close() { |
| if (_zkClient != null) { |
| _zkClient.close(); |
| } |
| if (_zkMetadataStoreDirectory != null) { |
| _zkMetadataStoreDirectory.close(); |
| } |
| if (_zkClientForRoutingDataListener != null) { |
| _zkClientForRoutingDataListener.close(); |
| } |
| } |
| |
| @Override |
| public void handleChildChange(String parentPath, List<String> currentChilds) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| // Resubscribe |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| resetZkResources(); |
| } |
| |
| @Override |
| public void handleDataChange(String dataPath, Object data) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| resetZkResources(); |
| } |
| |
| @Override |
| public void handleDataDeleted(String dataPath) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| // Resubscribe |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| resetZkResources(); |
| } |
| |
| @Override |
| public void handleStateChanged(Watcher.Event.KeeperState state) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| // Resubscribe |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| resetZkResources(); |
| } |
| |
| @Override |
| public void handleNewSession(String sessionId) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| // Resubscribe |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| resetZkResources(); |
| } |
| |
| @Override |
| public void handleSessionEstablishmentError(Throwable error) { |
| if (_zkClientForRoutingDataListener == null || _zkClientForRoutingDataListener.isClosed()) { |
| return; |
| } |
| // Resubscribe |
| _zkClientForRoutingDataListener.unsubscribeAll(); |
| _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this); |
| resetZkResources(); |
| } |
| |
| /** |
| * Resets all internally cached routing data by closing and nullifying both zkClient and |
| * byteArrayZkClient and Helix APIs. |
| * This operation is considered costly since it triggers reconnection of all I/O resources, but |
| * this is okay because routing data update should be infrequent. |
| */ |
| private void resetZkResources() { |
| synchronized (this) { |
| LOG.info("ServerContext: Resetting ZK resources due to routing data change! Routing ZK: {}", |
| _zkAddr); |
| try { |
| // Reset RoutingDataManager's cache |
| RoutingDataManager.getInstance().reset(); |
| |
| // Close all ZkClients |
| if (_zkClient != null && !_zkClient.isClosed()) { |
| _zkClient.close(); |
| } |
| if (_byteArrayZkClient != null && !_byteArrayZkClient.isClosed()) { |
| _byteArrayZkClient.close(); |
| } |
| _zkClient = null; |
| _byteArrayZkClient = null; |
| |
| // Close all Helix APIs |
| if (_zkHelixAdmin != null) { |
| _zkHelixAdmin.close(); |
| _zkHelixAdmin = null; |
| } |
| if (_clusterSetup != null) { |
| _clusterSetup.close(); |
| _clusterSetup = null; |
| } |
| if (_configAccessor != null) { |
| _configAccessor.close(); |
| _configAccessor = null; |
| } |
| if (_byteArrayZkBaseDataAccessor != null) { |
| _byteArrayZkBaseDataAccessor.close(); |
| _byteArrayZkBaseDataAccessor = null; |
| } |
| if (_zkBucketDataAccessor != null) { |
| _zkBucketDataAccessor.close(); |
| _zkBucketDataAccessor = null; |
| } |
| _helixDataAccessorPool.clear(); |
| _taskDriverPool.clear(); |
| } catch (Exception e) { |
| LOG.error("Failed to reset ZkClient and Helix APIs in ServerContext!", e); |
| } |
| } |
| } |
| } |