blob: fd24eb3c69544153b39ba45aee7057e303008bec [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.helix.HelixException;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* GenericZkHelixApiBuilder serves as the abstract parent class for Builders used by Helix Java APIs
* that create ZK connections. By having this class, we reduce duplicate code as much as possible.
* @param <B>
*/
public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilder<B>> {
private static final Logger LOG =
LoggerFactory.getLogger(GenericZkHelixApiBuilder.class.getName());
protected String _zkAddress;
protected RealmAwareZkClient.RealmMode _realmMode;
protected RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
protected RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
/**
* Note: If you set the ZK address explicitly, this setting will take priority over the ZK path
* sharding key set in RealmAwareZkConnectionConfig.
* If this field is null, and the realm mode is single-realm, then it will try to look up the
* ZK address based on the ZK path sharding key.
* @param zkAddress
* @return
*/
public B setZkAddress(String zkAddress) {
_zkAddress = zkAddress;
return self();
}
public B setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
_realmMode = realmMode;
return self();
}
public B setRealmAwareZkConnectionConfig(
RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
_realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
return self();
}
public B setRealmAwareZkClientConfig(
RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
_realmAwareZkClientConfig = realmAwareZkClientConfig;
return self();
}
/**
* Validates the given Builder parameters using a generic validation logic.
*
* This validation function checks whether realm mode has been set correctly, and
* resolves them if not set at all.
*
* If realm mode is null, we look at zkAddress field or RealmAwareZkClient's connection config to
* see if we could deduce the Zk address based on the MetadataStoreRoutingData.
*
* Note: If you set the ZK address explicitly, this setting will take priority over the ZK path
* sharding key set in RealmAwareZkConnectionConfig.
* If this field is null, and the realm mode is single-realm, then it will try to look up the
* ZK address based on the ZK path sharding key.
*/
protected void validate() {
initializeConfigsIfNull();
// Resolve RealmMode based on whether ZK address has been set
boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
// If realmMode is single-realm (in other words, ZkAddress is needed) and zk address is not
// given, then try to look up ZK address if ZK realm sharding key is set
// If realmMode is multi-realm, make sure it's not tied to a single sharding key
if (!isZkAddressSet && _realmAwareZkConnectionConfig.getZkRealmShardingKey() != null
&& !_realmAwareZkConnectionConfig.getZkRealmShardingKey().isEmpty()) {
if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
// Try to resolve the zk address using the zk path sharding key if given
try {
_zkAddress = resolveZkAddressWithShardingKey(_realmAwareZkConnectionConfig);
isZkAddressSet = true;
} catch (InvalidRoutingDataException e) {
LOG.warn(
"GenericZkHelixApiBuilder: ZkAddress is not set and failed to resolve ZkAddress with ZK path sharding key!",
e);
}
} else if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
// Multi-realm and a single sharding key cannot coexist (by definition, multi-realm can access multiple sharding keys)
throw new HelixException(
"GenericZkHelixApiBuilder: Cannot have a ZK path sharding key in ConnectionConfig on multi-realm mode! Multi-realm accesses multiple sharding keys.");
}
}
if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
throw new HelixException(
"GenericZkHelixApiBuilder: RealmMode cannot be single-realm without a valid ZkAddress set!");
}
if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
throw new HelixException(
"GenericZkHelixApiBuilder: ZkAddress cannot be set on multi-realm mode!");
}
if (_realmMode == null) {
_realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
: RealmAwareZkClient.RealmMode.MULTI_REALM;
}
}
/**
* Initializes Realm-aware ZkConnection and ZkClient configs if they haven't been set.
*/
protected void initializeConfigsIfNull() {
// Resolve all default values
if (_realmAwareZkConnectionConfig == null) {
_realmAwareZkConnectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
}
// For Helix APIs, ZNRecord should be the default data model
if (_realmAwareZkClientConfig == null) {
_realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer());
}
}
/**
* Creates a RealmAwareZkClient based on the parameters set.
* To be used in Helix ZK APIs' constructors: ConfigAccessor, ClusterSetup, ZKHelixAdmin
* @return
*/
protected RealmAwareZkClient createZkClient(RealmAwareZkClient.RealmMode realmMode,
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress) {
switch (realmMode) {
case MULTI_REALM:
try {
return new FederatedZkClient(connectionConfig,
clientConfig.setZkSerializer(new ZNRecordSerializer()));
} catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("GenericZkHelixApiBuilder: Failed to create FederatedZkClient!",
e);
}
case SINGLE_REALM:
// Create a HelixZkClient: Use a SharedZkClient because ClusterSetup does not need to do
// ephemeral operations
return SharedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress)
.setSessionTimeout(connectionConfig.getSessionTimeout()),
clientConfig.createHelixZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
default:
throw new HelixException("GenericZkHelixApiBuilder: Invalid RealmMode given: " + realmMode);
}
}
/**
* Returns an instance of a subclass-Builder in order to reduce duplicate code.
* SuppressWarnings is used to rid of IDE warnings.
* @return an instance of a subclass-Builder. E.g.) ConfigAccessor.Builder
*/
@SuppressWarnings("unchecked")
final B self() {
return (B) this;
}
/**
* Resolve Zk address based on the zk realm sharding key. This method is only used if the
* ZK address is not given in this Builder.
* @param connectionConfig
* @return
* @throws InvalidRoutingDataException
*/
private String resolveZkAddressWithShardingKey(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig)
throws InvalidRoutingDataException {
MetadataStoreRoutingData routingData =
RealmAwareZkClient.getMetadataStoreRoutingData(connectionConfig);
return routingData.getMetadataStoreRealm(connectionConfig.getZkRealmShardingKey());
}
}