blob: babed5614656d00424a476d3e50f64291cd2c9f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.localizer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.JarURLConnection;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A locally cached blob for the topology. storm.jar, stormcode.ser, or stormconf.ser.
* The version number of the blob's file will be stored in `${basename}.version`
*/
public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
public static final long LOCAL_MODE_JAR_VERSION = 1;
private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedTopologyBlob.class);
private static final Pattern EXTRACT_BASE_NAME_AND_VERSION = Pattern.compile("^(.*)\\.([0-9]+)$");
private final TopologyBlobType type;
private final String topologyId;
private final boolean isLocalMode;
private final Path topologyBasicBlobsRootDir;
private final AdvancedFSOps fsOps;
private final String owner;
private volatile long version = NOT_DOWNLOADED_VERSION;
private volatile long size = 0;
/**
* Create a new LocallyCachedBlob.
* @param topologyId the ID of the topology.
* @param type the type of the blob.
* @param owner the name of the user that owns this blob.
*/
protected LocallyCachedTopologyBlob(final String topologyId, final boolean isLocalMode, final Map<String, Object> conf,
final AdvancedFSOps fsOps, final TopologyBlobType type,
String owner, StormMetricsRegistry metricsRegistry) throws IOException {
super(topologyId + " " + type.getFileName(), type.getKey(topologyId), metricsRegistry);
this.topologyId = topologyId;
this.type = type;
this.isLocalMode = isLocalMode;
this.fsOps = fsOps;
this.owner = owner;
topologyBasicBlobsRootDir = Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
readVersion();
updateSizeOnDisk();
}
private static String resourcesJar() throws IOException {
String path = ServerUtils.currentClasspath();
if (path == null) {
return null;
}
for (String jpath : path.split(File.pathSeparator)) {
if (jpath.endsWith(".jar")) {
if (ServerUtils.zipDoesContainDir(jpath, ServerConfigUtils.RESOURCES_SUBDIR)) {
return jpath;
}
}
}
return null;
}
private void updateSizeOnDisk() throws IOException {
long total = getSizeOnDisk(topologyBasicBlobsRootDir.resolve(type.getFileName()));
if (type.needsExtraction()) {
total += getSizeOnDisk(topologyBasicBlobsRootDir.resolve(type.getExtractionDir()));
}
size = total;
}
private void readVersion() throws IOException {
Path versionFile = topologyBasicBlobsRootDir.resolve(type.getVersionFileName());
if (!fsOps.fileExists(versionFile)) {
version = NOT_DOWNLOADED_VERSION;
} else {
String ver = FileUtils.readFileToString(versionFile.toFile(), "UTF8").trim();
version = Long.parseLong(ver);
}
}
@Override
public long getLocalVersion() {
LOG.debug("LOCAL VERSION {}/{} is {}", type, topologyId, version);
return version;
}
@Override
public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException {
if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
LOG.debug("REMOTE VERSION LOCAL JAR {}", LOCAL_MODE_JAR_VERSION);
return LOCAL_MODE_JAR_VERSION;
}
return store.getBlobMeta(type.getKey(topologyId)).get_version();
}
@Override
public long fetchUnzipToTemp(ClientBlobStore store)
throws IOException, KeyNotFoundException, AuthorizationException {
synchronized (LocallyCachedTopologyBlob.class) {
if (!Files.exists(topologyBasicBlobsRootDir)) {
Files.createDirectories(topologyBasicBlobsRootDir);
fsOps.setupStormCodeDir(owner, topologyBasicBlobsRootDir.toFile());
}
}
if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId);
//This is a special case where the jar was not uploaded so we will not download it (it is already on the classpath)
String resourcesJar = resourcesJar();
URL url = ServerUtils.getResourceFromClassloader(ServerConfigUtils.RESOURCES_SUBDIR);
Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
if (resourcesJar != null) {
LOG.info("Extracting resources from jar at {} to {}", resourcesJar, extractionDest);
extractDirFromJar(resourcesJar, ServerConfigUtils.RESOURCES_SUBDIR, extractionDest);
} else if (url != null) {
LOG.info("Copying resources at {} to {}", url, extractionDest);
if ("jar".equals(url.getProtocol())) {
JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
extractDirFromJar(urlConnection.getJarFileURL().getFile(), ServerConfigUtils.RESOURCES_SUBDIR, extractionDest);
} else {
fsOps.copyDirectory(new File(url.getFile()), extractionDest.toFile());
}
} else if (!fsOps.fileExists(extractionDest)) {
// if we can't find the resources directory in a resources jar or in the classpath just create an empty
// resources directory. This way we can check later that the topology jar was fully downloaded.
fsOps.forceMkdir(extractionDest);
}
return LOCAL_MODE_JAR_VERSION;
}
DownloadMeta downloadMeta = fetch(store, type.getKey(topologyId),
v -> {
Path path = topologyBasicBlobsRootDir.resolve(type.getTempFileName(v));
fsOps.forceMkdir(path.getParent());
return path;
}, fsOps::getOutputStream);
Path tmpLocation = downloadMeta.getDownloadPath();
if (type.needsExtraction()) {
Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(downloadMeta.getVersion()));
extractDirFromJar(tmpLocation.toAbsolutePath().toString(), ServerConfigUtils.RESOURCES_SUBDIR,
extractionDest);
}
return downloadMeta.getVersion();
}
protected void extractDirFromJar(String jarpath, String dir, Path dest) throws IOException {
LOG.debug("EXTRACTING {} from {} and placing it at {}", dir, jarpath, dest);
if (!Files.exists(dest)) {
//Create the directory no matter what. This is so we can check if it was downloaded in the future.
Files.createDirectories(dest);
}
try (JarFile jarFile = new JarFile(jarpath)) {
String prefix = dir + '/';
ServerUtils.extractZipFile(jarFile, dest.toFile(), prefix);
}
}
@Override
public boolean isFullyDownloaded() {
Path versionFile = topologyBasicBlobsRootDir.resolve(type.getVersionFileName());
boolean ret = Files.exists(versionFile);
Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName());
if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) {
ret = ret && Files.exists(dest);
}
if (type.needsExtraction()) {
Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getExtractionDir());
ret = ret && Files.exists(extractionDest);
}
return ret;
}
@Override
protected void commitNewVersion(long newVersion) throws IOException {
//This is not atomic (so if something bad happens in the middle we need to be able to recover
Path tempLoc = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName());
Path versionFile = topologyBasicBlobsRootDir.resolve(type.getVersionFileName());
LOG.debug("Removing version file {} to force download on failure", versionFile);
fsOps.deleteIfExists(versionFile.toFile()); //So if we fail we are forced to try again
LOG.debug("Removing destination file {} in preparation for move", dest);
fsOps.deleteIfExists(dest.toFile());
if (type.needsExtraction()) {
Path extractionTemp = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion));
Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getExtractionDir());
LOG.debug("Removing extraction dest {} in preparation for extraction", extractionDest);
fsOps.deleteIfExists(extractionDest.toFile());
if (fsOps.fileExists(extractionTemp)) {
fsOps.moveDirectoryPreferAtomic(extractionTemp.toFile(), extractionDest.toFile());
}
}
if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) {
//Don't try to move the JAR file in local mode, it does not exist because it was not uploaded
fsOps.moveFile(tempLoc.toFile(), dest.toFile());
}
synchronized (LocallyCachedTopologyBlob.class) {
//This is a bit ugly, but it works. In order to maintain the same directory structure that existed before
// we need to have storm conf, storm jar, and storm code in a shared directory, and we need to set the
// permissions for that entire directory, but the tracking is on a per item basis, so we are going to end
// up running the permission modification code once for each blob that is downloaded (3 times in this case).
// Because the permission modification code runs in a separate process we are doing a global lock to avoid
// any races between multiple versions running at the same time. Ideally this would be on a per topology
// basis, but that is a lot harder and the changes run fairly quickly so it should not be a big deal.
fsOps.setupStormCodeDir(owner, topologyBasicBlobsRootDir.toFile());
File sharedMemoryDirFinalLocation = new File(topologyBasicBlobsRootDir.toFile(), "shared_by_topology");
sharedMemoryDirFinalLocation.mkdirs();
fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDirFinalLocation);
}
LOG.debug("Writing out version file {} with version {}", versionFile, newVersion);
FileUtils.write(versionFile.toFile(), Long.toString(newVersion), "UTF8");
this.version = newVersion;
updateSizeOnDisk();
LOG.debug("New version of {} - {} committed {}", topologyId, type, newVersion);
}
@Override
public void cleanupOrphanedData() throws IOException {
cleanUpTemp(type.getFileName());
if (type.needsExtraction()) {
cleanUpTemp(type.getExtractionDir());
}
}
private void cleanUpTemp(String baseName) throws IOException {
LOG.debug("Cleaning up temporary data in {}", topologyBasicBlobsRootDir);
try (DirectoryStream<Path> children = fsOps.newDirectoryStream(topologyBasicBlobsRootDir,
(p) -> {
String fileName = p.getFileName().toString();
Matcher m = EXTRACT_BASE_NAME_AND_VERSION.matcher(fileName);
return m.matches() && baseName.equals(m.group(1));
})
) {
//children is only ever null if topologyBasicBlobsRootDir does not exist. This happens during unit tests
// And because a non-existant directory is by definition clean we are ignoring it.
if (children != null) {
for (Path p : children) {
LOG.debug("Cleaning up {}", p);
fsOps.deleteIfExists(p.toFile());
}
}
}
}
@Override
public void completelyRemove() throws IOException {
removeAll(type.getFileName());
if (type.needsExtraction()) {
removeAll(type.getExtractionDir());
}
touch();
}
private void removeAll(String baseName) throws IOException {
try (DirectoryStream<Path> children = fsOps.newDirectoryStream(topologyBasicBlobsRootDir)) {
for (Path p : children) {
String fileName = p.getFileName().toString();
if (fileName.startsWith(baseName)) {
fsOps.deleteIfExists(p.toFile());
}
}
}
}
@Override
public long getSizeOnDisk() {
return size;
}
@Override
public boolean equals(Object other) {
if (other instanceof LocallyCachedTopologyBlob) {
LocallyCachedTopologyBlob o = (LocallyCachedTopologyBlob) other;
return topologyId.equals(o.topologyId) && type == o.type && topologyBasicBlobsRootDir.equals(o.topologyBasicBlobsRootDir);
}
return false;
}
@Override
public int hashCode() {
return topologyId.hashCode() + type.hashCode() + topologyBasicBlobsRootDir.hashCode();
}
@Override
public String toString() {
return "LOCAL TOPO BLOB " + type + " " + topologyId;
}
public enum TopologyBlobType {
TOPO_JAR("stormjar.jar", "-stormjar.jar", "resources"),
TOPO_CODE("stormcode.ser", "-stormcode.ser", null),
TOPO_CONF("stormconf.ser", "-stormconf.ser", null);
private final String fileName;
private final String keySuffix;
private final String extractionDir;
TopologyBlobType(String fileName, String keySuffix, String extractionDir) {
this.fileName = fileName;
this.keySuffix = keySuffix;
this.extractionDir = extractionDir;
}
public String getFileName() {
return fileName;
}
public String getTempFileName(long version) {
return fileName + "." + version;
}
public String getVersionFileName() {
return fileName + ".version";
}
public String getKey(String topologyId) {
return topologyId + keySuffix;
}
public boolean needsExtraction() {
return extractionDir != null;
}
public String getExtractionDir() {
return extractionDir;
}
public String getTempExtractionDir(long version) {
return extractionDir + "." + version;
}
}
}