blob: 53e63040cc694c7775102a18694758798245a74b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
import com.google.common.base.Joiner;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
public class MetricCollectorHAController {
private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
static final String CLUSTER_NAME = "ambari-metrics-cluster";
static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
static final String INSTANCE_NAME_DELIMITER = "_";
final String zkConnectUrl;
final String instanceHostname;
final InstanceConfig instanceConfig;
final AggregationTaskRunner aggregationTaskRunner;
final TimelineMetricConfiguration configuration;
// Cache list of known live instances
final List<String> liveInstanceNames = new ArrayList<>();
// Helix Admin
HelixAdmin admin;
// Helix Manager
HelixManager manager;
private volatile boolean isInitialized = false;
public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
this.configuration = configuration;
String instancePort;
try {
instanceHostname = configuration.getInstanceHostnameFromEnv();
instancePort = configuration.getInstancePort();
} catch (Exception e) {
LOG.error("Error reading configs from classpath, will resort to defaults.", e);
throw new MetricsSystemInitializationException(e.getMessage());
}
try {
String zkClientPort = configuration.getClusterZKClientPort();
String zkQuorum = configuration.getClusterZKQuorum();
if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+ zkClientPort +", quorum = " + zkQuorum);
}
zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
} catch (Exception e) {
LOG.error("Unable to load hbase-site from classpath.", e);
throw new MetricsSystemInitializationException(e.getMessage());
}
instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
instanceConfig.setHostName(instanceHostname);
instanceConfig.setPort(instancePort);
instanceConfig.setInstanceEnabled(true);
aggregationTaskRunner = new AggregationTaskRunner(
instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
}
/**
* Name of Helix znode
*/
public String getClusterName() {
return CLUSTER_NAME;
}
/**
* Initialize the instance with zookeeper via Helix
*/
public void initializeHAController() throws Exception {
String clusterName = getClusterName();
admin = new ZKHelixAdmin(zkConnectUrl);
// create cluster
LOG.info("Creating zookeeper cluster node: " + clusterName);
admin.addCluster(clusterName, false);
// Adding host to the cluster
List<String> nodes = Collections.EMPTY_LIST;
try {
nodes = admin.getInstancesInCluster(clusterName);
} catch (ZkNoNodeException ex) {
LOG.warn("Child znode under /" + CLUSTER_NAME + " not found.Recreating the cluster.");
admin.addCluster(clusterName, true);
}
if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
LOG.info("Adding participant instance " + instanceConfig);
admin.addInstance(clusterName, instanceConfig);
}
// Add a state model
if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
StateModelConfigGenerator.generateConfigForOnlineOffline()));
}
// Add resources with 1 cluster-wide replica
// Since our aggregators are unbalanced in terms of work distribution we
// only need to distribute writes to METRIC_AGGREGATE and
// METRIC_RECORD_MINUTE
List<String> resources = admin.getResourcesInCluster(clusterName);
if (!resources.contains(METRIC_AGGREGATORS)) {
LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
}
// this will set up the ideal state, it calculates the preference list for
// each partition similar to consistent hashing
admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
// Start participant
startAggregators();
// Start controller
startController();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
aggregationTaskRunner.stop();
manager.disconnect();
}
});
isInitialized = true;
}
/**
* Return true if HA controller is enabled.
*/
public boolean isInitialized() {
return isInitialized;
}
private void startAggregators() {
try {
aggregationTaskRunner.initialize();
} catch (Exception e) {
LOG.error("Unable to start aggregators.", e);
throw new MetricsSystemInitializationException(e.getMessage());
}
}
private void startController() throws Exception {
manager = HelixManagerFactory.getZKHelixManager(
getClusterName(),
instanceHostname,
InstanceType.CONTROLLER,
zkConnectUrl
);
manager.connect();
HelixController controller = new HelixController();
manager.addLiveInstanceChangeListener(controller);
}
private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
StringBuilder sb = new StringBuilder();
String[] quorumParts = zkQuorum.split(",");
String prefix = "";
for (String part : quorumParts) {
sb.append(prefix);
sb.append(part.trim());
if (!part.contains(":")) {
sb.append(":");
sb.append(zkClientPort);
}
prefix = ",";
}
return sb.toString();
}
public AggregationTaskRunner getAggregationTaskRunner() {
return aggregationTaskRunner;
}
public List<String> getLiveInstanceHostNames() {
List<String> liveInstanceHostNames = new ArrayList<>();
for (String instance : liveInstanceNames) {
liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
}
return liveInstanceHostNames;
}
public class HelixController extends GenericHelixController {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Joiner joiner = Joiner.on(", ").skipNulls();
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
super.onLiveInstanceChange(liveInstances, changeContext);
liveInstanceNames.clear();
for (LiveInstance instance : liveInstances) {
liveInstanceNames.add(instance.getInstanceName());
}
LOG.info("Detected change in liveliness of Collector instances. " +
"LiveIsntances = " + joiner.join(liveInstanceNames));
// Print HA state - after some delay
executorService.schedule(new Runnable() {
@Override
public void run() {
printClusterState();
}
}, 30, TimeUnit.SECONDS);
}
}
public void printClusterState() {
StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
if (resourceExternalView != null) {
getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
}
sb.append("\n##################################################");
LOG.info(sb.toString());
}
private void getPrintableResourceState(ExternalView resourceExternalView,
String resourceName,
StringBuilder sb) {
TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
sb.append("\nCLUSTER: ");
sb.append(getClusterName());
sb.append("\nRESOURCE: ");
sb.append(resourceName);
for (String partitionName : sortedSet) {
sb.append("\nPARTITION: ");
sb.append(partitionName).append("\t");
Map<String, String> states = resourceExternalView.getStateMap(partitionName);
for (Map.Entry<String, String> stateEntry : states.entrySet()) {
sb.append("\t");
sb.append(stateEntry.getKey());
sb.append("\t");
sb.append(stateEntry.getValue());
}
}
}
}