blob: 2a9ec2426ef53e77d8f850bed53b8a4d4dbc5b54 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.storm.localizer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.storm.utils.Utils.OR;
* This is a wrapper around the Localizer class that provides the desired
* async interface to Slot.
public class AsyncLocalizer implements ILocalizer, Shutdownable {
* A future that has already completed.
private static class AllDoneFuture implements Future<Void> {
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
public boolean isCancelled() {
return false;
public boolean isDone() {
return true;
public Void get() {
return null;
public Void get(long timeout, TimeUnit unit) {
return null;
private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
private final Localizer _localizer;
private final ExecutorService _execService;
private final boolean _isLocalMode;
private final Map<String, Object> _conf;
private final Map<String, LocalDownloadedResource> _basicPending;
private final Map<String, LocalDownloadedResource> _blobPending;
private final AdvancedFSOps _fsOps;
private final boolean _symlinksDisabled;
private class DownloadBaseBlobsDistributed implements Callable<Void> {
protected final String _topologyId;
protected final File _stormRoot;
protected final String owner;
public DownloadBaseBlobsDistributed(String topologyId, String owner) throws IOException {
_topologyId = topologyId;
_stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
this.owner = owner;
protected void downloadBaseBlobs(File tmproot) throws Exception {
String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(_conf);
try {
Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
} finally {
Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot);
public Void call() throws Exception {
try {
if (_fsOps.fileExists(_stormRoot)) {
if (!_fsOps.supportsAtomicDirectoryMove()) {
LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId);
} else {
LOG.warn("{} already downloaded blobs, skipping", _topologyId);
return null;
boolean deleteAll = true;
String tmproot = ConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
File tr = new File(tmproot);
try {
_fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
_fsOps.setupStormCodeDir(owner, _stormRoot);
deleteAll = false;
} finally {
if (deleteAll) {
LOG.warn("Failed to download basic resources for topology-id {}", _topologyId);
return null;
} catch (Exception e) {
LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
throw e;
private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
public DownloadBaseBlobsLocal(String topologyId, String owner) throws IOException {
super(topologyId, owner);
protected void downloadBaseBlobs(File tmproot) throws Exception {
String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
BlobStore blobStore = Utils.getNimbusBlobStore(_conf, null);
try {
try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){
blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) {
blobStore.readBlobTo(stormConfKey, confOutStream, null);
} finally {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
String resourcesJar = AsyncLocalizer.resourcesJar();
URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR;
if (resourcesJar != null) {"Extracting resources from jar at {} to {}", resourcesJar, targetDir);
Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
} else if (url != null) {"Copying resources at {} to {}", url, targetDir);
if ("jar".equals(url.getProtocol())) {
JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
} else {
_fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, ConfigUtils.RESOURCES_SUBDIR));
private class DownloadBlobs implements Callable<Void> {
private final String _topologyId;
private final String topoOwner;
public DownloadBlobs(String topologyId, String topoOwner) {
_topologyId = topologyId;
this.topoOwner = topoOwner;
public Void call() throws Exception {
try {
String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
List<LocalResource> localResourceList = new ArrayList<>();
if (blobstoreMap != null) {
List<LocalResource> tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
if (tmp != null) {
StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
List<String> dependencies = new ArrayList<>();
if (stormCode.is_set_dependency_jars()) {
if (stormCode.is_set_dependency_artifacts()) {
for (String dependency : dependencies) {
localResourceList.add(new LocalResource(dependency, false));
if (!localResourceList.isEmpty()) {
File userDir = _localizer.getLocalUserFileCacheDir(topoOwner);
if (!_fsOps.fileExists(userDir)) {
List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
_fsOps.setupBlobPermissions(userDir, topoOwner);
if (!_symlinksDisabled) {
for (LocalizedResource localizedResource : localizedResources) {
String keyName = localizedResource.getKey();
//The sym link we are pointing to
File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath());
String symlinkName = null;
if (blobstoreMap != null) {
Map<String, Object> blobInfo = blobstoreMap.get(keyName);
if (blobInfo != null && blobInfo.containsKey("localname")) {
symlinkName = (String) blobInfo.get("localname");
} else {
symlinkName = keyName;
} else {
// all things are from dependencies
symlinkName = keyName;
_fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
return null;
} catch (Exception e) {
LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
throw e;
//Visible for testing
AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
_conf = conf;
_symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), false);
_isLocalMode = ConfigUtils.isLocalMode(conf);
_localizer = localizer;
_execService = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("Async Localizer")
_basicPending = new HashMap<>();
_blobPending = new HashMap<>();
_fsOps = ops;
public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
this(conf, localizer, AdvancedFSOps.make(conf));
public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
final String topologyId = assignment.get_topology_id();
LocalDownloadedResource localResource = _basicPending.get(topologyId);
if (localResource == null) {
Callable<Void> c;
if (_isLocalMode) {
c = new DownloadBaseBlobsLocal(topologyId, assignment.get_owner());
} else {
c = new DownloadBaseBlobsDistributed(topologyId, assignment.get_owner());
localResource = new LocalDownloadedResource(_execService.submit(c));
_basicPending.put(topologyId, localResource);
Future<Void> ret = localResource.reserve(port, assignment);
LOG.debug("Reserved basic {} {}", topologyId, localResource);
return ret;
private static String resourcesJar() throws IOException {
String path = Utils.currentClasspath();
if (path == null) {
return null;
for (String jpath : path.split(File.pathSeparator)) {
if (jpath.endsWith(".jar")) {
if (Utils.zipDoesContainDir(jpath, ConfigUtils.RESOURCES_SUBDIR)) {
return jpath;
return null;
public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
final String topologyId = assignment.get_topology_id();
LocalDownloadedResource localResource = _basicPending.get(topologyId);
if (localResource == null) {
localResource = new LocalDownloadedResource(new AllDoneFuture());
_basicPending.put(topologyId, localResource);
localResource.reserve(port, assignment);
LOG.debug("Recovered basic {} {}", topologyId, localResource);
localResource = _blobPending.get(topologyId);
if (localResource == null) {
localResource = new LocalDownloadedResource(new AllDoneFuture());
_blobPending.put(topologyId, localResource);
localResource.reserve(port, assignment);
LOG.debug("Recovered blobs {} {}", topologyId, localResource);
public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
final String topologyId = assignment.get_topology_id();
LocalDownloadedResource localResource = _blobPending.get(topologyId);
if (localResource == null) {
Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner());
localResource = new LocalDownloadedResource(_execService.submit(c));
_blobPending.put(topologyId, localResource);
Future<Void> ret = localResource.reserve(port, assignment);
LOG.debug("Reserved blobs {} {}", topologyId, localResource);
return ret;
public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
final String topologyId = assignment.get_topology_id();
LOG.debug("Releasing slot for {} {}", topologyId, port);
LocalDownloadedResource localResource = _blobPending.get(topologyId);
if (localResource == null || !localResource.release(port, assignment)) {
LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource);
} else if (localResource.isDone()){"Released blob reference {} {} Cleaning up BLOB references...", topologyId, port);
Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId);
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
if (blobstoreMap != null) {
String user = assignment.get_owner();
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
String key = entry.getKey();
Map<String, Object> blobInfo = entry.getValue();
try {
_localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
} catch (Exception e) {
throw new IOException(e);
} else {
LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource);
localResource = _basicPending.get(topologyId);
if (localResource == null || !localResource.release(port, assignment)) {
LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource);
} else if (localResource.isDone()){"Released blob reference {} {} Cleaning up basic files...", topologyId, port);
String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId);
_fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
} else {
LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource);
public synchronized void cleanupUnusedTopologies() throws IOException {
File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));"Cleaning up unused topologies in {}", distRoot);
File[] children = distRoot.listFiles();
if (children != null) {
for (File topoDir : children) {
String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) {
_fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
public void shutdown() {