| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hive.metastore; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.lang3.concurrent.BasicThreadFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileChecksum; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.Trash; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hive.metastore.api.Database; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.metastore.utils.EncryptionZoneUtils; |
| import org.apache.hadoop.hive.metastore.utils.FileUtils; |
| import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; |
| import org.apache.hadoop.hive.metastore.utils.Retry; |
| import org.apache.hadoop.hive.metastore.utils.StringUtils; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
| import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; |
| import org.apache.hadoop.security.UserGroupInformation; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ReplChangeManager { |
| private static final Logger LOG = LoggerFactory.getLogger(ReplChangeManager.class); |
| static private ReplChangeManager instance; |
| |
| private static boolean inited = false; |
| private static boolean enabled = false; |
| private static Map<String, String> encryptionZoneToCmrootMapping = new HashMap<>(); |
| private static Configuration conf; |
| private String msUser; |
| private String msGroup; |
| |
| private static final String ORIG_LOC_TAG = "user.original-loc"; |
| static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; |
| private static final String URI_FRAGMENT_SEPARATOR = "#"; |
| public static final String SOURCE_OF_REPLICATION = "repl.source.for"; |
| private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; |
| static final String CM_THREAD_NAME_PREFIX = "cmclearer-"; |
| private static final String NO_ENCRYPTION = "noEncryption"; |
| private static String cmRootDir; |
| private static String encryptedCmRootDir; |
| private static String fallbackNonEncryptedCmRootDir; |
| |
| public enum RecycleType { |
| MOVE, |
| COPY |
| } |
| |
| public static class FileInfo { |
| private FileSystem srcFs; |
| private Path sourcePath; |
| private Path cmPath; |
| private String checkSum; |
| private boolean useSourcePath; |
| private String subDir; |
| private boolean copyDone; |
| |
| public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) { |
| this(srcFs, sourcePath, null, null, true, subDir); |
| } |
| public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, |
| String checkSum, boolean useSourcePath, String subDir) { |
| this.srcFs = srcFs; |
| this.sourcePath = sourcePath; |
| this.cmPath = cmPath; |
| this.checkSum = checkSum; |
| this.useSourcePath = useSourcePath; |
| this.subDir = subDir; |
| this.copyDone = false; |
| } |
| public FileSystem getSrcFs() { |
| return srcFs; |
| } |
| public Path getSourcePath() { |
| return sourcePath; |
| } |
| public Path getCmPath() { |
| return cmPath; |
| } |
| public String getCheckSum() { |
| return checkSum; |
| } |
| public boolean isUseSourcePath() { |
| return useSourcePath; |
| } |
| public void setIsUseSourcePath(boolean useSourcePath) { |
| this.useSourcePath = useSourcePath; |
| } |
| public String getSubDir() { |
| return subDir; |
| } |
| public boolean isCopyDone() { |
| return copyDone; |
| } |
| public void setCopyDone(boolean copyDone) { |
| this.copyDone = copyDone; |
| } |
| public Path getEffectivePath() { |
| if (useSourcePath) { |
| return sourcePath; |
| } else { |
| return cmPath; |
| } |
| } |
| } |
| |
| public static synchronized ReplChangeManager getInstance(Configuration conf) |
| throws MetaException { |
| if (instance == null) { |
| instance = new ReplChangeManager(conf); |
| } |
| return instance; |
| } |
| |
| public static synchronized ReplChangeManager getInstance() { |
| if (!inited) { |
| throw new IllegalStateException("Replication Change Manager is not initialized."); |
| } |
| return instance; |
| } |
| |
| private ReplChangeManager(Configuration conf) throws MetaException { |
| try { |
| if (!inited) { |
| if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { |
| ReplChangeManager.enabled = true; |
| ReplChangeManager.conf = conf; |
| cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR); |
| encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR); |
| fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR); |
| //validate cmRootEncrypted is absolute |
| Path cmRootEncrypted = new Path(encryptedCmRootDir); |
| if (cmRootEncrypted.isAbsolute()) { |
| throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); |
| } |
| //Create default cm root |
| Path cmroot = new Path(cmRootDir); |
| createCmRoot(cmroot); |
| FileSystem cmRootFs = cmroot.getFileSystem(conf); |
| if (EncryptionZoneUtils.isPathEncrypted(cmroot, conf)) { |
| //If cm root is encrypted we keep using it for the encryption zone |
| String encryptionZonePath = cmRootFs.getUri() |
| + EncryptionZoneUtils.getEncryptionZoneForPath(cmroot, conf).getPath(); |
| encryptionZoneToCmrootMapping.put(encryptionZonePath, cmRootDir); |
| } else { |
| encryptionZoneToCmrootMapping.put(NO_ENCRYPTION, cmRootDir); |
| } |
| if (!StringUtils.isEmpty(fallbackNonEncryptedCmRootDir)) { |
| Path cmRootFallback = new Path(fallbackNonEncryptedCmRootDir); |
| if (!cmRootFallback.isAbsolute()) { |
| throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be absolute path"); |
| } |
| createCmRoot(cmRootFallback); |
| if (EncryptionZoneUtils.isPathEncrypted(cmRootFallback, conf)) { |
| throw new MetaException(ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.getHiveName() |
| + " should not be encrypted"); |
| } |
| } |
| UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); |
| msUser = usergroupInfo.getShortUserName(); |
| msGroup = usergroupInfo.getPrimaryGroupName(); |
| } |
| inited = true; |
| } |
| } catch (IOException e) { |
| throw new MetaException(StringUtils.stringifyException(e)); |
| } |
| } |
| |
| // Filter files starts with ".". Note Hadoop consider files starts with |
| // "." or "_" as hidden file. However, we need to replicate files starts |
| // with "_". We find at least 2 use cases: |
| // 1. For har files, _index and _masterindex is required files |
| // 2. _success file is required for Oozie to indicate availability of data source |
| private static final PathFilter hiddenFileFilter = new PathFilter(){ |
| public boolean accept(Path p){ |
| return !p.getName().startsWith("."); |
| } |
| }; |
| |
| /*** |
| * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), |
| * recursively move files inside directory to cmroot. Note the table must be managed table |
| * @param path a single file or directory |
| * @param type if the files to be copied or moved to cmpath. |
| * Copy is costly but preserve the source file |
| * @param ifPurge if the file should skip Trash when move/delete source file. |
| * This is referred only if type is MOVE. |
| * @return int |
| * @throws IOException |
| */ |
| public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { |
| if (!enabled) { |
| return 0; |
| } |
| |
| int count = 0; |
| FileSystem fs = path.getFileSystem(conf); |
| if (fs.isDirectory(path)) { |
| FileStatus[] files = fs.listStatus(path, hiddenFileFilter); |
| for (FileStatus file : files) { |
| count += recycle(file.getPath(), type, ifPurge); |
| } |
| } else { |
| String fileCheckSum = checksumFor(path, fs); |
| Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString()); |
| |
| // set timestamp before moving to cmroot, so we can |
| // avoid race condition CM remove the file before setting |
| // timestamp |
| long now = System.currentTimeMillis(); |
| fs.setTimes(path, now, -1); |
| |
| boolean success = false; |
| if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { |
| // If already a file with same checksum exists in cmPath, just ignore the copy/move |
| // Also, mark the operation is unsuccessful to notify that file with same name already |
| // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by |
| // CM cleaner. |
| success = false; |
| } else { |
| switch (type) { |
| case MOVE: { |
| LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); |
| // Rename fails if the file with same name already exist. |
| Retry<Boolean> retriable = new Retry<Boolean>(IOException.class) { |
| @Override |
| public Boolean execute() throws IOException { |
| return fs.rename(path, cmPath); |
| } |
| }; |
| try { |
| success = retriable.run(); |
| } catch (Exception e) { |
| throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); |
| } |
| break; |
| } |
| case COPY: { |
| LOG.info("Copying {} to {}", path.toString(), cmPath.toString()); |
| |
| // It is possible to have a file with same checksum in cmPath but the content is |
| // partially copied or corrupted. In this case, just overwrite the existing file with |
| // new one. |
| success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); |
| break; |
| } |
| default: |
| // Operation fails as invalid input |
| break; |
| } |
| } |
| |
| // Ignore if a file with same content already exist in cmroot |
| // We might want to setXAttr for the new location in the future |
| if (success) { |
| // set the file owner to hive (or the id metastore run as) |
| fs.setOwner(cmPath, msUser, msGroup); |
| |
| // tag the original file name so we know where the file comes from |
| // Note we currently only track the last known trace as |
| // xattr has limited capacity. We shall revisit and store all original |
| // locations if orig-loc becomes important |
| try { |
| fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes(StandardCharsets.UTF_8)); |
| } catch (UnsupportedOperationException e) { |
| LOG.warn("Error setting xattr for {}", path.toString()); |
| } |
| |
| count++; |
| } else { |
| LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); |
| // Need to extend the tenancy if we saw a newer file with the same content |
| fs.setTimes(cmPath, now, -1); |
| } |
| |
| // Tag if we want to remain in trash after deletion. |
| // If multiple files share the same content, then |
| // any file claim remain in trash would be granted |
| if ((type == RecycleType.MOVE) && !ifPurge) { |
| try { |
| fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 }); |
| } catch (UnsupportedOperationException e) { |
| LOG.warn("Error setting xattr for {}", cmPath.toString()); |
| } |
| } |
| } |
| return count; |
| } |
| |
| // Get checksum of a file |
| static public String checksumFor(Path path, FileSystem fs) throws IOException { |
| // TODO: fs checksum only available on hdfs, need to |
| // find a solution for other fs (eg, local fs, s3, etc) |
| String checksumString = null; |
| FileChecksum checksum = fs.getFileChecksum(path); |
| if (checksum != null) { |
| checksumString = StringUtils.byteToHexString( |
| checksum.getBytes(), 0, checksum.getLength()); |
| } |
| return checksumString; |
| } |
| |
| /*** |
| * Convert a path of file inside a partition or table (if non-partitioned) |
| * to a deterministic location of cmroot. So user can retrieve the file back |
| * with the original location plus checksum. |
| * @param conf Hive configuration |
| * @param name original filename |
| * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} |
| * @param cmRootUri CM Root URI. (From remote source if REPL LOAD flow. From local config if recycle.) |
| * @return Path |
| */ |
| static Path getCMPath(Configuration conf, String name, String checkSum, String cmRootUri) { |
| String newFileName = name + "_" + checkSum; |
| int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, |
| DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); |
| |
| if (newFileName.length() > maxLength) { |
| newFileName = newFileName.substring(0, maxLength-1); |
| } |
| |
| return new Path(cmRootUri, newFileName); |
| } |
| |
| /*** |
| * Get original file specified by src and chksumString. If the file exists and checksum |
| * matches, return the file; otherwise, use chksumString to retrieve it from cmroot |
| * @param src Original file location |
| * @param checksumString Checksum of the original file |
| * @param srcCMRootURI CM root URI of the source cluster |
| * @param subDir Sub directory to which the source file belongs to |
| * @param conf Hive configuration |
| * @return Corresponding FileInfo object |
| */ |
| public static FileInfo getFileInfo(Path src, String checksumString, String srcCMRootURI, String subDir, |
| Configuration conf) throws MetaException { |
| try { |
| FileSystem srcFs = src.getFileSystem(conf); |
| if (checksumString == null) { |
| return new FileInfo(srcFs, src, subDir); |
| } |
| |
| Path cmPath = getCMPath(conf, src.getName(), checksumString, srcCMRootURI); |
| if (!srcFs.exists(src)) { |
| return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); |
| } |
| |
| String currentChecksumString; |
| try { |
| currentChecksumString = checksumFor(src, srcFs); |
| } catch (IOException ex) { |
| // If the file is missing or getting modified, then refer CM path |
| return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); |
| } |
| if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) { |
| return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir); |
| } else { |
| return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); |
| } |
| } catch (IOException e) { |
| throw new MetaException(StringUtils.stringifyException(e)); |
| } |
| } |
| |
| /*** |
| * Concatenate filename, checksum, source cmroot uri and subdirectory with "#" |
| * @param fileUriStr Filename string |
| * @param fileChecksum Checksum string |
| * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means, |
| * the multiple levels of subdirectories are concatenated with path separator "/" |
| * @return Concatenated Uri string |
| */ |
| // TODO: this needs to be enhanced once change management based filesystem is implemented |
| // Currently using fileuri#checksum#cmrooturi#subdirs as the format |
| public String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) |
| throws IOException { |
| if (instance == null) { |
| throw new IllegalStateException("Uninitialized ReplChangeManager instance."); |
| } |
| Path cmRootPath = getCmRoot(new Path(fileUriStr)); |
| String cmRoot = null; |
| if (cmRootPath != null) { |
| cmRoot = FileUtils.makeQualified(cmRootPath, conf).toString(); |
| } |
| return ReplChangeManager.encodeFileUri(fileUriStr, fileChecksum, cmRoot, encodedSubDir); |
| } |
| |
| public static String encodeFileUri(String fileUriStr, String fileChecksum, String cmRoot, String encodedSubDir) { |
| String encodedUri = fileUriStr; |
| if ((fileChecksum != null) && (cmRoot != null)) { |
| encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + URI_FRAGMENT_SEPARATOR + cmRoot; |
| } else { |
| encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; |
| } |
| encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != null) ? encodedSubDir : ""); |
| LOG.debug("Encoded URI: " + encodedUri); |
| return encodedUri; |
| } |
| |
| /*** |
| * Split uri with fragment into file uri, subdirs, checksum and source cmroot uri. |
| * Currently using fileuri#checksum#cmrooturi#subdirs as the format. |
| * @param fileURIStr uri with fragment |
| * @return array of file name, subdirs, checksum and source CM root URI |
| */ |
| public static String[] decodeFileUri(String fileURIStr) { |
| String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); |
| String[] result = new String[4]; |
| result[0] = uriAndFragment[0]; |
| if ((uriAndFragment.length > 1) && !StringUtils.isEmpty(uriAndFragment[1])) { |
| result[1] = uriAndFragment[1]; |
| } |
| if ((uriAndFragment.length > 2) && !StringUtils.isEmpty(uriAndFragment[2])) { |
| result[2] = uriAndFragment[2]; |
| } |
| if ((uriAndFragment.length > 3) && !StringUtils.isEmpty(uriAndFragment[3])) { |
| result[3] = uriAndFragment[3]; |
| } |
| LOG.debug("Reading Encoded URI: " + result[0] + ":: " + result[1] + ":: " + result[2] + ":: " + result[3]); |
| return result; |
| } |
| |
| public static boolean isCMFileUri(Path fromPath) { |
| String[] result = decodeFileUri(fromPath.toString()); |
| return result[1] != null; |
| } |
| |
| /** |
| * Thread to clear old files of cmroot recursively |
| */ |
| static class CMClearer implements Runnable { |
| private Map<String, String> encryptionZones; |
| private long secRetain; |
| private Configuration conf; |
| |
| CMClearer(Map<String, String> encryptionZones, long secRetain, Configuration conf) { |
| this.encryptionZones = encryptionZones; |
| this.secRetain = secRetain; |
| this.conf = conf; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| LOG.info("CMClearer started"); |
| for (String cmrootString : encryptionZones.values()) { |
| Path cmroot = new Path(cmrootString); |
| long now = System.currentTimeMillis(); |
| FileSystem fs = cmroot.getFileSystem(conf); |
| FileStatus[] files = fs.listStatus(cmroot); |
| |
| for (FileStatus file : files) { |
| long modifiedTime = file.getModificationTime(); |
| if (now - modifiedTime > secRetain * 1000) { |
| try { |
| if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { |
| boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); |
| if (succ) { |
| LOG.debug("Move " + file.toString() + " to trash"); |
| } else { |
| LOG.warn("Fail to move " + file.toString() + " to trash"); |
| } |
| } else { |
| boolean succ = fs.delete(file.getPath(), false); |
| if (succ) { |
| LOG.debug("Remove " + file.toString()); |
| } else { |
| LOG.warn("Fail to remove " + file.toString()); |
| } |
| } |
| } catch (UnsupportedOperationException e) { |
| LOG.warn("Error getting xattr for " + file.getPath().toString()); |
| } |
| } |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); |
| } |
| } |
| } |
| |
| // Schedule CMClearer thread. Will be invoked by metastore |
| static void scheduleCMClearer(Configuration conf) { |
| if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { |
| ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( |
| new BasicThreadFactory.Builder() |
| .namingPattern(CM_THREAD_NAME_PREFIX + "%d") |
| .daemon(true) |
| .build()); |
| executor.scheduleAtFixedRate(new CMClearer(encryptionZoneToCmrootMapping, |
| MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), |
| 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); |
| } |
| } |
| |
| public static boolean shouldEnableCm(Database db, Table table) { |
| assert (table != null); |
| return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table); |
| } |
| |
| public static boolean isSourceOfReplication(Database db) { |
| assert (db != null); |
| String replPolicyIds = getReplPolicyIdString(db); |
| return !StringUtils.isEmpty(replPolicyIds); |
| } |
| |
| public static String getReplPolicyIdString(Database db) { |
| if (db != null) { |
| Map<String, String> m = db.getParameters(); |
| if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { |
| String replPolicyId = m.get(SOURCE_OF_REPLICATION); |
| LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); |
| return replPolicyId; |
| } |
| LOG.debug("Repl policy is not set for database: {}", db.getName()); |
| } |
| return null; |
| } |
| |
| public static String joinWithSeparator(Iterable<?> strings) { |
| return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings); |
| } |
| |
| public static String[] getListFromSeparatedString(String commaSeparatedString) { |
| return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); |
| } |
| |
| @VisibleForTesting |
| Path getCmRoot(Path path) throws IOException { |
| Path cmroot = null; |
| //Default path if hive.repl.cm dir is encrypted |
| String cmrootDir = fallbackNonEncryptedCmRootDir; |
| String encryptionZonePath = NO_ENCRYPTION; |
| if (enabled) { |
| if (EncryptionZoneUtils.isPathEncrypted(path, conf)) { |
| encryptionZonePath = path.getFileSystem(conf).getUri() |
| + EncryptionZoneUtils.getEncryptionZoneForPath(path, conf).getPath(); |
| //For encryption zone, create cm at the relative path specified by hive.repl.cm.encryptionzone.rootdir |
| //at the root of the encryption zone |
| cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir; |
| } |
| if (encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { |
| cmroot = new Path(encryptionZoneToCmrootMapping.get(encryptionZonePath)); |
| } else { |
| cmroot = new Path(cmrootDir); |
| synchronized (instance) { |
| if (!encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { |
| createCmRoot(cmroot); |
| encryptionZoneToCmrootMapping.put(encryptionZonePath, cmrootDir); |
| } |
| } |
| } |
| } |
| return cmroot; |
| } |
| |
| private static void createCmRoot(Path cmroot) throws IOException { |
| Retry<Void> retriable = new Retry<Void>(IOException.class) { |
| @Override |
| public Void execute() throws IOException { |
| FileSystem cmFs = cmroot.getFileSystem(conf); |
| // Create cmroot with permission 700 if not exist |
| if (!cmFs.exists(cmroot)) { |
| cmFs.mkdirs(cmroot); |
| cmFs.setPermission(cmroot, new FsPermission("700")); |
| } |
| return null; |
| } |
| }; |
| try { |
| retriable.run(); |
| } catch (Exception e) { |
| throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); |
| } |
| } |
| |
| @VisibleForTesting |
| static void resetReplChangeManagerInstance() { |
| inited = false; |
| enabled = false; |
| instance = null; |
| encryptionZoneToCmrootMapping.clear(); |
| } |
| |
| public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() { |
| @Override |
| public boolean accept(Path p) { |
| if (enabled) { |
| String name = p.getName(); |
| return StringUtils.isEmpty(fallbackNonEncryptedCmRootDir) |
| ? (!name.contains(cmRootDir) && !name.contains(encryptedCmRootDir)) |
| : (!name.contains(cmRootDir) && !name.contains(encryptedCmRootDir) |
| && !name.contains(fallbackNonEncryptedCmRootDir)); |
| } |
| return true; |
| } |
| }; |
| } |