blob: 08c7e99056c81b1a0153e0a6934d8ade41b54efe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.nimbus;
import com.codahale.metrics.Meter;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.security.auth.Subject;
import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.shade.com.google.common.base.Joiner;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A callback function when nimbus gains leadership.
*/
public class LeaderListenerCallback {
private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
private final Meter numGainedLeader;
private final Meter numLostLeader;
private final BlobStore blobStore;
private final TopoCache tc;
private final IStormClusterState clusterState;
private final CuratorFramework zk;
private final ILeaderElector leaderElector;
private final Map conf;
private final List<ACL> acls;
private final int requeueDelayMs;
/**
* Constructor for {@LeaderListenerCallback}.
* @param conf config
* @param zk zookeeper CuratorFramework client
* @param blobStore BlobStore
* @param leaderElector Leader elector
* @param tc TopoCache
* @param clusterState IStormClusterState
* @param acls zookeeper acls
*/
public LeaderListenerCallback(Map conf, CuratorFramework zk, BlobStore blobStore, ILeaderElector leaderElector,
TopoCache tc, IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
this.blobStore = blobStore;
this.tc = tc;
this.clusterState = clusterState;
this.zk = zk;
this.leaderElector = leaderElector;
this.conf = conf;
this.acls = acls;
this.numGainedLeader = metricsRegistry.registerMeter("nimbus:num-gained-leadership");
this.numLostLeader = metricsRegistry.registerMeter("nimbus:num-lost-leadership");
//Since we only give up leadership if we're waiting for blobs to sync,
//it makes sense to wait a full sync cycle before trying for leadership again.
this.requeueDelayMs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS)) * 1000;
}
/**
* Invoke when gains leadership.
*/
public void leaderCallBack() {
numGainedLeader.mark();
//set up nimbus-info to zk
setUpNimbusInfo(acls);
//sync zk assignments/id-info to local
LOG.info("Sync remote assignments and id-info to local");
clusterState.syncRemoteAssignments(null);
clusterState.syncRemoteIds(null);
clusterState.setAssignmentsBackendSynchronized();
Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk,
ClusterUtils.STORMS_SUBTREE, false));
Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
// this finds all active topologies blob keys from all local topology blob keys
Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
generateJoinedString(diffTopology));
if (diffTopology.isEmpty()) {
Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
// this finds all dependency blob keys from active topologies from all local blob keys
Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
generateJoinedString(diffDependencies));
if (diffDependencies.isEmpty()) {
LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
tc.clear();
} else {
LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+ "giving up leadership.");
surrenderLeadership();
}
} else {
LOG.info("code for all active topologies not available locally, giving up leadership.");
surrenderLeadership();
}
}
/**
* Invoke when lost leadership.
*/
public void notLeaderCallback() {
numLostLeader.mark();
tc.clear();
}
private void setUpNimbusInfo(List<ACL> acls) {
String leaderInfoPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.LEADERINFO_SUBTREE;
NimbusInfo nimbusInfo = NimbusInfo.fromConf(conf);
if (ClientZookeeper.existsNode(zk, leaderInfoPath, false)) {
ClientZookeeper.setData(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo));
} else {
ClientZookeeper.createNode(zk, leaderInfoPath, Utils.javaSerialize(nimbusInfo), CreateMode.PERSISTENT, acls);
}
}
private String generateJoinedString(Set<String> activeTopologyIds) {
return Joiner.on(",").join(activeTopologyIds);
}
private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds) {
Set<String> activeTopologyBlobKeys = new TreeSet<>();
for (String activeTopologyId : activeTopologyIds) {
activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX);
activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX);
activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX);
}
return activeTopologyBlobKeys;
}
private Set<String> filterTopologyBlobKeys(Set<String> blobKeys) {
Set<String> topologyBlobKeys = new HashSet<>();
for (String blobKey : blobKeys) {
if (blobKey.endsWith(STORM_JAR_SUFFIX)
|| blobKey.endsWith(STORM_CODE_SUFFIX)
|| blobKey.endsWith(STORM_CONF_SUFFIX)) {
topologyBlobKeys.add(blobKey);
}
}
return topologyBlobKeys;
}
private Set<String> filterTopologyCodeKeys(Set<String> blobKeys) {
Set<String> topologyCodeKeys = new HashSet<>();
for (String blobKey : blobKeys) {
if (blobKey.endsWith(STORM_CODE_SUFFIX)) {
topologyCodeKeys.add(blobKey);
}
}
return topologyCodeKeys;
}
private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys) {
Set<String> activeTopologyDependencies = new TreeSet<>();
Subject subject = ReqContext.context().subject();
for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
try (InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject)) {
byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
if (stormCode.is_set_dependency_jars()) {
activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
}
if (stormCode.is_set_dependency_artifacts()) {
activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
}
} catch (AuthorizationException | KeyNotFoundException | IOException e) {
LOG.error("Exception occurs while reading blob for key: "
+ activeTopologyCodeKey
+ ", exception: "
+ e, e);
throw new RuntimeException("Exception occurs while reading blob for key: "
+ activeTopologyCodeKey
+ ", exception: " + e, e);
}
}
return activeTopologyDependencies;
}
private void surrenderLeadership() {
try {
leaderElector.quitElectionFor(requeueDelayMs);
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
}