| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.localizer; |
| |
| import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.FileAlreadyExistsException; |
| import java.nio.file.Files; |
| import java.nio.file.NoSuchFileException; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.storm.Config; |
| import org.apache.storm.blobstore.ClientBlobStore; |
| import org.apache.storm.daemon.supervisor.IAdvancedFSOps; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.generated.ReadableBlobMeta; |
| import org.apache.storm.metric.StormMetricsRegistry; |
| import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; |
| import org.apache.storm.utils.ConfigUtils; |
| import org.apache.storm.utils.ObjectReader; |
| import org.apache.storm.utils.ServerUtils; |
| import org.apache.storm.utils.ShellUtils; |
| import org.apache.storm.utils.WrappedAuthorizationException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Represents a resource that is localized on the supervisor. A localized resource has a .current symlink to the current version file which |
| * is named filename.{current version}. There is also a filename.version which contains the latest version. |
| */ |
| public class LocalizedResource extends LocallyCachedBlob { |
| @VisibleForTesting |
| static final String CURRENT_BLOB_SUFFIX = ".current"; |
| @VisibleForTesting |
| static final String BLOB_VERSION_SUFFIX = ".version"; |
| @VisibleForTesting |
| static final String FILECACHE = "filecache"; |
| @VisibleForTesting |
| static final String USERCACHE = "usercache"; |
| // sub directories to store either files or uncompressed archives respectively |
| @VisibleForTesting |
| static final String FILESDIR = "files"; |
| @VisibleForTesting |
| static final String ARCHIVESDIR = "archives"; |
| private static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class); |
| private static final String TO_UNCOMPRESS = "_tmp_"; |
| private static final Pattern VERSION_FILE_PATTERN = Pattern.compile("^(.+)\\.(\\d+)$"); |
| // filesystem path to the resource |
| private final Path baseDir; |
| private final Path versionFilePath; |
| private final Path symlinkPath; |
| private final boolean shouldUncompress; |
| private final IAdvancedFSOps fsOps; |
| private final String user; |
| private final Map<String, Object> conf; |
| private final boolean symLinksDisabled; |
| // size of the resource |
| private long size = -1; |
| |
| LocalizedResource(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf, |
| String user, StormMetricsRegistry metricRegistry) { |
| super(key + (shouldUncompress ? " archive" : " file"), key, metricRegistry); |
| Path base = getLocalUserFileCacheDir(localBaseDir, user); |
| this.baseDir = shouldUncompress ? getCacheDirForArchives(base) : getCacheDirForFiles(base); |
| this.conf = conf; |
| this.symLinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); |
| this.user = user; |
| this.fsOps = fsOps; |
| versionFilePath = constructVersionFileName(baseDir, key); |
| symlinkPath = constructBlobCurrentSymlinkName(baseDir, key); |
| this.shouldUncompress = shouldUncompress; |
| //Set the size in case we are recovering an already downloaded object |
| setSize(); |
| } |
| |
| private static Path constructVersionFileName(Path baseDir, String key) { |
| return baseDir.resolve(key + BLOB_VERSION_SUFFIX); |
| } |
| |
| @VisibleForTesting |
| static long localVersionOfBlob(Path versionFile) { |
| long currentVersion = -1; |
| if (Files.exists(versionFile) && !(Files.isDirectory(versionFile))) { |
| try (BufferedReader br = new BufferedReader(new FileReader(versionFile.toFile()))) { |
| String line = br.readLine(); |
| currentVersion = Long.parseLong(line); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return currentVersion; |
| } |
| |
| private static Path constructBlobCurrentSymlinkName(Path baseDir, String key) { |
| return baseDir.resolve(key + CURRENT_BLOB_SUFFIX); |
| } |
| |
| private static Path constructBlobWithVersionFileName(Path baseDir, String key, long version) { |
| return baseDir.resolve(key + "." + version); |
| } |
| |
| static Collection<String> getLocalizedUsers(Path localBaseDir) throws IOException { |
| Path userCacheDir = getUserCacheDir(localBaseDir); |
| if (!Files.exists(userCacheDir)) { |
| return Collections.emptyList(); |
| } |
| return Files.list(userCacheDir).map((p) -> p.getFileName().toString()).collect(Collectors.toList()); |
| } |
| |
| static void completelyRemoveUnusedUser(Path localBaseDir, String user) throws IOException { |
| Path localUserDir = getLocalUserDir(localBaseDir, user); |
| LOG.info("completelyRemoveUnusedUser {} for directory {}", user, localUserDir); |
| |
| Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user); |
| // baseDir/supervisor/usercache/user1/filecache/files |
| Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir)); |
| // baseDir/supervisor/usercache/user1/filecache/archives |
| Files.deleteIfExists(getCacheDirForArchives(userFileCacheDir)); |
| // baseDir/supervisor/usercache/user1/filecache |
| Files.deleteIfExists(userFileCacheDir); |
| // baseDir/supervisor/usercache/user1 |
| Files.deleteIfExists(localUserDir); |
| } |
| |
| static List<String> getLocalizedArchiveKeys(Path localBaseDir, String user) throws IOException { |
| Path dir = getCacheDirForArchives(getLocalUserFileCacheDir(localBaseDir, user)); |
| return readKeysFromDir(dir); |
| } |
| |
| static List<String> getLocalizedFileKeys(Path localBaseDir, String user) throws IOException { |
| Path dir = getCacheDirForFiles(getLocalUserFileCacheDir(localBaseDir, user)); |
| return readKeysFromDir(dir); |
| } |
| |
| // Looks for files in the directory with .current suffix |
| private static List<String> readKeysFromDir(Path dir) throws IOException { |
| if (!Files.exists(dir)) { |
| return Collections.emptyList(); |
| } |
| return Files.list(dir) |
| .map((p) -> p.getFileName().toString()) |
| .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX)) |
| .map((key) -> { |
| int p = key.lastIndexOf('.'); |
| if (p > 0) { |
| key = key.substring(0, p); |
| } |
| return key; |
| }) |
| .collect(Collectors.toList()); |
| } |
| |
| // baseDir/supervisor/usercache/ |
| private static Path getUserCacheDir(Path localBaseDir) { |
| return localBaseDir.resolve(USERCACHE); |
| } |
| |
| // baseDir/supervisor/usercache/user1/ |
| static Path getLocalUserDir(Path localBaseDir, String userName) { |
| return getUserCacheDir(localBaseDir).resolve(userName); |
| } |
| |
| // baseDir/supervisor/usercache/user1/filecache |
| static Path getLocalUserFileCacheDir(Path localBaseDir, String userName) { |
| return getLocalUserDir(localBaseDir, userName).resolve(FILECACHE); |
| } |
| |
| // baseDir/supervisor/usercache/user1/filecache/files |
| private static Path getCacheDirForFiles(Path dir) { |
| return dir.resolve(FILESDIR); |
| } |
| |
| // get the directory to put uncompressed archives in |
| // baseDir/supervisor/usercache/user1/filecache/archives |
| private static Path getCacheDirForArchives(Path dir) { |
| return dir.resolve(ARCHIVESDIR); |
| } |
| |
| Path getCurrentSymlinkPath() { |
| return symlinkPath; |
| } |
| |
| @VisibleForTesting |
| Path getFilePathWithVersion() { |
| return constructBlobWithVersionFileName(baseDir, getKey(), getLocalVersion()); |
| } |
| |
| private void setSize() { |
| // we trust that the file exists |
| Path withVersion = getFilePathWithVersion(); |
| size = ServerUtils.getDiskUsage(withVersion.toFile()); |
| LOG.debug("size of {} is: {}", withVersion, size); |
| } |
| |
| @VisibleForTesting |
| protected void setSize(long size) { |
| this.size = size; |
| } |
| |
| @Override |
| public long getLocalVersion() { |
| return localVersionOfBlob(versionFilePath); |
| } |
| |
| @Override |
| public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException { |
| return ServerUtils.nimbusVersionOfBlob(getKey(), store); |
| } |
| |
| @Override |
| public long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException { |
| String key = getKey(); |
| ReadableBlobMeta meta = store.getBlobMeta(key); |
| if (!ServerUtils.canUserReadBlob(meta, user, conf)) { |
| throw new WrappedAuthorizationException(user + " does not have READ access to " + key); |
| } |
| |
| DownloadMeta downloadMeta = fetch(store, key, v -> { |
| Path path = shouldUncompress ? tmpOutputLocation() : constructBlobWithVersionFileName(baseDir, getKey(), v); |
| // we need to download to temp file and then unpack into the one requested |
| Path parent = path.getParent(); |
| if (!Files.exists(parent)) { |
| //There is a race here that we can still lose |
| try { |
| Files.createDirectories(parent); |
| } catch (FileAlreadyExistsException e) { |
| //Ignored |
| } catch (IOException e) { |
| LOG.error("Failed to create parent directory {}", parent, e); |
| throw e; |
| } |
| } |
| return path; |
| }, |
| FileOutputStream::new |
| ); |
| |
| Path finalLocation = downloadMeta.getDownloadPath(); |
| if (shouldUncompress) { |
| Path downloadFile = finalLocation; |
| finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), downloadMeta.getVersion()); |
| ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile(), symLinksDisabled); |
| LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation); |
| } |
| setBlobPermissions(conf, user, finalLocation); |
| return downloadMeta.getVersion(); |
| } |
| |
| @Override |
| protected void commitNewVersion(long version) throws IOException { |
| String key = getKey(); |
| LOG.info("Blob: {} updated to version {} from version {}", key, version, getLocalVersion()); |
| Path localVersionFile = versionFilePath; |
| // The false parameter ensures overwriting the version file, not appending |
| try (PrintWriter writer = new PrintWriter( |
| new BufferedWriter(new FileWriter(localVersionFile.toFile(), false)))) { |
| writer.println(version); |
| } |
| setBlobPermissions(conf, user, localVersionFile); |
| |
| // Update the key.current symlink. First create tmp symlink and do |
| // move of tmp to current so that the operation is atomic. |
| Path tmpSymlink = tmpSymlinkLocation(); |
| Path targetOfSymlink = constructBlobWithVersionFileName(baseDir, getKey(), version); |
| LOG.debug("Creating a symlink @{} linking to: {}", tmpSymlink, targetOfSymlink); |
| Files.createSymbolicLink(tmpSymlink, targetOfSymlink); |
| |
| Path currentSymLink = getCurrentSymlinkPath(); |
| Files.move(tmpSymlink, currentSymLink, ATOMIC_MOVE); |
| //Update the size of the objects |
| setSize(); |
| } |
| |
| private void setBlobPermissions(Map<String, Object> conf, String user, Path path) |
| throws IOException { |
| |
| if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { |
| return; |
| } |
| String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), ""); |
| if (wlCommand.isEmpty()) { |
| String stormHome = System.getProperty(ConfigUtils.STORM_HOME); |
| wlCommand = stormHome + "/bin/worker-launcher"; |
| } |
| List<String> command = new ArrayList<>(Arrays.asList(wlCommand, user, "blob", path.toString())); |
| |
| String[] commandArray = command.toArray(new String[command.size()]); |
| ShellUtils.ShellCommandExecutor shExec = new ShellUtils.ShellCommandExecutor(commandArray); |
| LOG.debug("Setting blob permissions, command: {}", Arrays.toString(commandArray)); |
| |
| try { |
| shExec.execute(); |
| LOG.debug("output: {}", shExec.getOutput()); |
| } catch (ShellUtils.ExitCodeException e) { |
| int exitCode = shExec.getExitCode(); |
| LOG.warn("Exit code from worker-launcher is: {}", exitCode, e); |
| LOG.debug("output: {}", shExec.getOutput()); |
| throw new IOException("Setting blob permissions failed" |
| + " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); |
| } |
| } |
| |
| private Path tmpOutputLocation() { |
| return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey())); |
| } |
| |
| private Path tmpSymlinkLocation() { |
| return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey() + CURRENT_BLOB_SUFFIX)); |
| } |
| |
| @Override |
| public void cleanupOrphanedData() throws IOException { |
| //There are a few possible files that we would want to clean up |
| //baseDir + "/" + "_tmp_" + baseName |
| //baseDir + "/" + "_tmp_" + baseName + ".current" |
| //baseDir + "/" + baseName.<VERSION> |
| //baseDir + "/" + baseName.current |
| //baseDir + "/" + baseName.version |
| //In general we always want to delete the _tmp_ files if they are there. |
| |
| Path tmpOutput = tmpOutputLocation(); |
| Files.deleteIfExists(tmpOutput); |
| Path tmpSym = tmpSymlinkLocation(); |
| Files.deleteIfExists(tmpSym); |
| |
| try { |
| String baseName = getKey(); |
| long version = getLocalVersion(); |
| Path current = getCurrentSymlinkPath(); |
| |
| //If .current and .version do not match, we roll back the .version file to match |
| // what .current is pointing to. |
| if (Files.exists(current) && Files.isSymbolicLink(current)) { |
| Path versionFile = Files.readSymbolicLink(current); |
| Matcher m = VERSION_FILE_PATTERN.matcher(versionFile.getFileName().toString()); |
| if (m.matches()) { |
| long foundVersion = Long.valueOf(m.group(2)); |
| if (foundVersion != version) { |
| LOG.error("{} does not match the version file so fix the version file", current); |
| //The versions are different so roll back to whatever current is |
| try (PrintWriter restoreWriter = new PrintWriter( |
| new BufferedWriter(new FileWriter(versionFilePath.toFile(), false)))) { |
| restoreWriter.println(foundVersion); |
| } |
| version = foundVersion; |
| } |
| } |
| } |
| |
| // Finally delete any baseName.<VERSION> files that are not pointed to by the current version |
| final long finalVersion = version; |
| LOG.debug("Looking to clean up after {} in {}", getKey(), baseDir); |
| try (DirectoryStream<Path> ds = fsOps.newDirectoryStream(baseDir, (path) -> { |
| Matcher m = VERSION_FILE_PATTERN.matcher(path.getFileName().toString()); |
| if (m.matches()) { |
| long foundVersion = Long.valueOf(m.group(2)); |
| return m.group(1).equals(baseName) && foundVersion != finalVersion; |
| } |
| return false; |
| })) { |
| for (Path p : ds) { |
| LOG.info("Cleaning up old localized resource file {}", p); |
| if (Files.isDirectory(p)) { |
| FileUtils.deleteDirectory(p.toFile()); |
| } else { |
| fsOps.deleteIfExists(p.toFile()); |
| } |
| } |
| } |
| } catch (NoSuchFileException e) { |
| LOG.warn("Nothing to cleanup with baseDir {} even though we expected there to be something there", baseDir); |
| } |
| } |
| |
| @Override |
| public void completelyRemove() throws IOException { |
| Path fileWithVersion = getFilePathWithVersion(); |
| Path currentSymLink = getCurrentSymlinkPath(); |
| |
| if (shouldUncompress) { |
| if (Files.exists(fileWithVersion)) { |
| // this doesn't follow symlinks, which is what we want |
| FileUtils.deleteDirectory(fileWithVersion.toFile()); |
| } |
| } else { |
| Files.deleteIfExists(fileWithVersion); |
| } |
| Files.deleteIfExists(currentSymLink); |
| Files.deleteIfExists(versionFilePath); |
| } |
| |
| @Override |
| public long getSizeOnDisk() { |
| return size; |
| } |
| |
| @Override |
| public boolean isFullyDownloaded() { |
| return Files.exists(getFilePathWithVersion()) |
| && Files.exists(getCurrentSymlinkPath()) |
| && Files.exists(versionFilePath); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other instanceof LocalizedResource) { |
| LocalizedResource l = (LocalizedResource) other; |
| return getKey().equals(l.getKey()) && shouldUncompress == l.shouldUncompress && baseDir.equals(l.baseDir); |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return getKey().hashCode() + Boolean.hashCode(shouldUncompress) + baseDir.hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return this.user + ":" + getKey(); |
| } |
| } |