blob: f984def0beb8e479e408fd55effa2a6885150a20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.localizer;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.ShellUtils;
import org.apache.storm.utils.WrappedAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a resource that is localized on the supervisor. A localized resource has a .current symlink to the current version file which
* is named filename.{current version}. There is also a filename.version which contains the latest version.
*/
public class LocalizedResource extends LocallyCachedBlob {
@VisibleForTesting
static final String CURRENT_BLOB_SUFFIX = ".current";
@VisibleForTesting
static final String BLOB_VERSION_SUFFIX = ".version";
@VisibleForTesting
static final String FILECACHE = "filecache";
@VisibleForTesting
static final String USERCACHE = "usercache";
// sub directories to store either files or uncompressed archives respectively
@VisibleForTesting
static final String FILESDIR = "files";
@VisibleForTesting
static final String ARCHIVESDIR = "archives";
private static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class);
private static final String TO_UNCOMPRESS = "_tmp_";
private static final Pattern VERSION_FILE_PATTERN = Pattern.compile("^(.+)\\.(\\d+)$");
// filesystem path to the resource
private final Path baseDir;
private final Path versionFilePath;
private final Path symlinkPath;
private final boolean shouldUncompress;
private final IAdvancedFSOps fsOps;
private final String user;
private final Map<String, Object> conf;
private final boolean symLinksDisabled;
// size of the resource
private long size = -1;
LocalizedResource(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf,
String user, StormMetricsRegistry metricRegistry) {
super(key + (shouldUncompress ? " archive" : " file"), key, metricRegistry);
Path base = getLocalUserFileCacheDir(localBaseDir, user);
this.baseDir = shouldUncompress ? getCacheDirForArchives(base) : getCacheDirForFiles(base);
this.conf = conf;
this.symLinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
this.user = user;
this.fsOps = fsOps;
versionFilePath = constructVersionFileName(baseDir, key);
symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
this.shouldUncompress = shouldUncompress;
//Set the size in case we are recovering an already downloaded object
setSize();
}
private static Path constructVersionFileName(Path baseDir, String key) {
return baseDir.resolve(key + BLOB_VERSION_SUFFIX);
}
@VisibleForTesting
static long localVersionOfBlob(Path versionFile) {
long currentVersion = -1;
if (Files.exists(versionFile) && !(Files.isDirectory(versionFile))) {
try (BufferedReader br = new BufferedReader(new FileReader(versionFile.toFile()))) {
String line = br.readLine();
currentVersion = Long.parseLong(line);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return currentVersion;
}
private static Path constructBlobCurrentSymlinkName(Path baseDir, String key) {
return baseDir.resolve(key + CURRENT_BLOB_SUFFIX);
}
private static Path constructBlobWithVersionFileName(Path baseDir, String key, long version) {
return baseDir.resolve(key + "." + version);
}
static Collection<String> getLocalizedUsers(Path localBaseDir) throws IOException {
Path userCacheDir = getUserCacheDir(localBaseDir);
if (!Files.exists(userCacheDir)) {
return Collections.emptyList();
}
return Files.list(userCacheDir).map((p) -> p.getFileName().toString()).collect(Collectors.toList());
}
static void completelyRemoveUnusedUser(Path localBaseDir, String user) throws IOException {
Path localUserDir = getLocalUserDir(localBaseDir, user);
LOG.info("completelyRemoveUnusedUser {} for directory {}", user, localUserDir);
Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user);
// baseDir/supervisor/usercache/user1/filecache/files
Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir));
// baseDir/supervisor/usercache/user1/filecache/archives
Files.deleteIfExists(getCacheDirForArchives(userFileCacheDir));
// baseDir/supervisor/usercache/user1/filecache
Files.deleteIfExists(userFileCacheDir);
// baseDir/supervisor/usercache/user1
Files.deleteIfExists(localUserDir);
}
static List<String> getLocalizedArchiveKeys(Path localBaseDir, String user) throws IOException {
Path dir = getCacheDirForArchives(getLocalUserFileCacheDir(localBaseDir, user));
return readKeysFromDir(dir);
}
static List<String> getLocalizedFileKeys(Path localBaseDir, String user) throws IOException {
Path dir = getCacheDirForFiles(getLocalUserFileCacheDir(localBaseDir, user));
return readKeysFromDir(dir);
}
// Looks for files in the directory with .current suffix
private static List<String> readKeysFromDir(Path dir) throws IOException {
if (!Files.exists(dir)) {
return Collections.emptyList();
}
return Files.list(dir)
.map((p) -> p.getFileName().toString())
.filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX))
.map((key) -> {
int p = key.lastIndexOf('.');
if (p > 0) {
key = key.substring(0, p);
}
return key;
})
.collect(Collectors.toList());
}
// baseDir/supervisor/usercache/
private static Path getUserCacheDir(Path localBaseDir) {
return localBaseDir.resolve(USERCACHE);
}
// baseDir/supervisor/usercache/user1/
static Path getLocalUserDir(Path localBaseDir, String userName) {
return getUserCacheDir(localBaseDir).resolve(userName);
}
// baseDir/supervisor/usercache/user1/filecache
static Path getLocalUserFileCacheDir(Path localBaseDir, String userName) {
return getLocalUserDir(localBaseDir, userName).resolve(FILECACHE);
}
// baseDir/supervisor/usercache/user1/filecache/files
private static Path getCacheDirForFiles(Path dir) {
return dir.resolve(FILESDIR);
}
// get the directory to put uncompressed archives in
// baseDir/supervisor/usercache/user1/filecache/archives
private static Path getCacheDirForArchives(Path dir) {
return dir.resolve(ARCHIVESDIR);
}
Path getCurrentSymlinkPath() {
return symlinkPath;
}
@VisibleForTesting
Path getFilePathWithVersion() {
return constructBlobWithVersionFileName(baseDir, getKey(), getLocalVersion());
}
private void setSize() {
// we trust that the file exists
Path withVersion = getFilePathWithVersion();
size = ServerUtils.getDiskUsage(withVersion.toFile());
LOG.debug("size of {} is: {}", withVersion, size);
}
@VisibleForTesting
protected void setSize(long size) {
this.size = size;
}
@Override
public long getLocalVersion() {
return localVersionOfBlob(versionFilePath);
}
@Override
public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException {
return ServerUtils.nimbusVersionOfBlob(getKey(), store);
}
@Override
public long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException {
String key = getKey();
ReadableBlobMeta meta = store.getBlobMeta(key);
if (!ServerUtils.canUserReadBlob(meta, user, conf)) {
throw new WrappedAuthorizationException(user + " does not have READ access to " + key);
}
DownloadMeta downloadMeta = fetch(store, key, v -> {
Path path = shouldUncompress ? tmpOutputLocation() : constructBlobWithVersionFileName(baseDir, getKey(), v);
// we need to download to temp file and then unpack into the one requested
Path parent = path.getParent();
if (!Files.exists(parent)) {
//There is a race here that we can still lose
try {
Files.createDirectories(parent);
} catch (FileAlreadyExistsException e) {
//Ignored
} catch (IOException e) {
LOG.error("Failed to create parent directory {}", parent, e);
throw e;
}
}
return path;
},
FileOutputStream::new
);
Path finalLocation = downloadMeta.getDownloadPath();
if (shouldUncompress) {
Path downloadFile = finalLocation;
finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), downloadMeta.getVersion());
ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile(), symLinksDisabled);
LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation);
}
setBlobPermissions(conf, user, finalLocation);
return downloadMeta.getVersion();
}
@Override
protected void commitNewVersion(long version) throws IOException {
String key = getKey();
LOG.info("Blob: {} updated to version {} from version {}", key, version, getLocalVersion());
Path localVersionFile = versionFilePath;
// The false parameter ensures overwriting the version file, not appending
try (PrintWriter writer = new PrintWriter(
new BufferedWriter(new FileWriter(localVersionFile.toFile(), false)))) {
writer.println(version);
}
setBlobPermissions(conf, user, localVersionFile);
// Update the key.current symlink. First create tmp symlink and do
// move of tmp to current so that the operation is atomic.
Path tmpSymlink = tmpSymlinkLocation();
Path targetOfSymlink = constructBlobWithVersionFileName(baseDir, getKey(), version);
LOG.debug("Creating a symlink @{} linking to: {}", tmpSymlink, targetOfSymlink);
Files.createSymbolicLink(tmpSymlink, targetOfSymlink);
Path currentSymLink = getCurrentSymlinkPath();
Files.move(tmpSymlink, currentSymLink, ATOMIC_MOVE);
//Update the size of the objects
setSize();
}
private void setBlobPermissions(Map<String, Object> conf, String user, Path path)
throws IOException {
if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
return;
}
String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
if (wlCommand.isEmpty()) {
String stormHome = System.getProperty(ConfigUtils.STORM_HOME);
wlCommand = stormHome + "/bin/worker-launcher";
}
List<String> command = new ArrayList<>(Arrays.asList(wlCommand, user, "blob", path.toString()));
String[] commandArray = command.toArray(new String[command.size()]);
ShellUtils.ShellCommandExecutor shExec = new ShellUtils.ShellCommandExecutor(commandArray);
LOG.debug("Setting blob permissions, command: {}", Arrays.toString(commandArray));
try {
shExec.execute();
LOG.debug("output: {}", shExec.getOutput());
} catch (ShellUtils.ExitCodeException e) {
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from worker-launcher is: {}", exitCode, e);
LOG.debug("output: {}", shExec.getOutput());
throw new IOException("Setting blob permissions failed"
+ " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
}
}
private Path tmpOutputLocation() {
return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey()));
}
private Path tmpSymlinkLocation() {
return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey() + CURRENT_BLOB_SUFFIX));
}
@Override
public void cleanupOrphanedData() throws IOException {
//There are a few possible files that we would want to clean up
//baseDir + "/" + "_tmp_" + baseName
//baseDir + "/" + "_tmp_" + baseName + ".current"
//baseDir + "/" + baseName.<VERSION>
//baseDir + "/" + baseName.current
//baseDir + "/" + baseName.version
//In general we always want to delete the _tmp_ files if they are there.
Path tmpOutput = tmpOutputLocation();
Files.deleteIfExists(tmpOutput);
Path tmpSym = tmpSymlinkLocation();
Files.deleteIfExists(tmpSym);
try {
String baseName = getKey();
long version = getLocalVersion();
Path current = getCurrentSymlinkPath();
//If .current and .version do not match, we roll back the .version file to match
// what .current is pointing to.
if (Files.exists(current) && Files.isSymbolicLink(current)) {
Path versionFile = Files.readSymbolicLink(current);
Matcher m = VERSION_FILE_PATTERN.matcher(versionFile.getFileName().toString());
if (m.matches()) {
long foundVersion = Long.valueOf(m.group(2));
if (foundVersion != version) {
LOG.error("{} does not match the version file so fix the version file", current);
//The versions are different so roll back to whatever current is
try (PrintWriter restoreWriter = new PrintWriter(
new BufferedWriter(new FileWriter(versionFilePath.toFile(), false)))) {
restoreWriter.println(foundVersion);
}
version = foundVersion;
}
}
}
// Finally delete any baseName.<VERSION> files that are not pointed to by the current version
final long finalVersion = version;
LOG.debug("Looking to clean up after {} in {}", getKey(), baseDir);
try (DirectoryStream<Path> ds = fsOps.newDirectoryStream(baseDir, (path) -> {
Matcher m = VERSION_FILE_PATTERN.matcher(path.getFileName().toString());
if (m.matches()) {
long foundVersion = Long.valueOf(m.group(2));
return m.group(1).equals(baseName) && foundVersion != finalVersion;
}
return false;
})) {
for (Path p : ds) {
LOG.info("Cleaning up old localized resource file {}", p);
if (Files.isDirectory(p)) {
FileUtils.deleteDirectory(p.toFile());
} else {
fsOps.deleteIfExists(p.toFile());
}
}
}
} catch (NoSuchFileException e) {
LOG.warn("Nothing to cleanup with baseDir {} even though we expected there to be something there", baseDir);
}
}
@Override
public void completelyRemove() throws IOException {
Path fileWithVersion = getFilePathWithVersion();
Path currentSymLink = getCurrentSymlinkPath();
if (shouldUncompress) {
if (Files.exists(fileWithVersion)) {
// this doesn't follow symlinks, which is what we want
FileUtils.deleteDirectory(fileWithVersion.toFile());
}
} else {
Files.deleteIfExists(fileWithVersion);
}
Files.deleteIfExists(currentSymLink);
Files.deleteIfExists(versionFilePath);
}
@Override
public long getSizeOnDisk() {
return size;
}
@Override
public boolean isFullyDownloaded() {
return Files.exists(getFilePathWithVersion())
&& Files.exists(getCurrentSymlinkPath())
&& Files.exists(versionFilePath);
}
@Override
public boolean equals(Object other) {
if (other instanceof LocalizedResource) {
LocalizedResource l = (LocalizedResource) other;
return getKey().equals(l.getKey()) && shouldUncompress == l.shouldUncompress && baseDir.equals(l.baseDir);
}
return false;
}
@Override
public int hashCode() {
return getKey().hashCode() + Boolean.hashCode(shouldUncompress) + baseDir.hashCode();
}
@Override
public String toString() {
return this.user + ":" + getKey();
}
}