| /* |
| * Copyright The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.fs; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Modifier; |
| import java.lang.reflect.Proxy; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.net.URI; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FilterFileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.ReflectionUtils; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.util.Progressable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import edu.umd.cs.findbugs.annotations.Nullable; |
| |
| /** |
| * An encapsulation for the FileSystem object that hbase uses to access |
| * data. This class allows the flexibility of using |
| * separate filesystem objects for reading and writing hfiles and wals. |
| */ |
| public class HFileSystem extends FilterFileSystem { |
| public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class); |
| |
| private final FileSystem noChecksumFs; // read hfile data from storage |
| private final boolean useHBaseChecksum; |
| private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; |
| |
| /** |
| * Create a FileSystem object for HBase regionservers. |
| * @param conf The configuration to be used for the filesystem |
| * @param useHBaseChecksum if true, then use |
| * checksum verfication in hbase, otherwise |
| * delegate checksum verification to the FileSystem. |
| */ |
| public HFileSystem(Configuration conf, boolean useHBaseChecksum) |
| throws IOException { |
| |
| // Create the default filesystem with checksum verification switched on. |
| // By default, any operation to this FilterFileSystem occurs on |
| // the underlying filesystem that has checksums switched on. |
| this.fs = FileSystem.get(conf); |
| this.useHBaseChecksum = useHBaseChecksum; |
| |
| fs.initialize(getDefaultUri(conf), conf); |
| |
| // disable checksum verification for local fileSystem, see HBASE-11218 |
| if (fs instanceof LocalFileSystem) { |
| fs.setWriteChecksum(false); |
| fs.setVerifyChecksum(false); |
| } |
| |
| addLocationsOrderInterceptor(conf); |
| |
| // If hbase checksum verification is switched on, then create a new |
| // filesystem object that has cksum verification turned off. |
| // We will avoid verifying checksums in the fs client, instead do it |
| // inside of hbase. |
| // If this is the local file system hadoop has a bug where seeks |
| // do not go to the correct location if setVerifyChecksum(false) is called. |
| // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read |
| // their header magic numbers. See HBASE-5885 |
| if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) { |
| conf = new Configuration(conf); |
| conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); |
| this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf); |
| this.noChecksumFs.setVerifyChecksum(false); |
| } else { |
| this.noChecksumFs = maybeWrapFileSystem(fs, conf); |
| } |
| |
| this.fs = maybeWrapFileSystem(this.fs, conf); |
| } |
| |
| /** |
| * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and |
| * writefs are both set to be the same specified fs. |
| * Do not verify hbase-checksums while reading data from filesystem. |
| * @param fs Set the noChecksumFs and writeFs to this specified filesystem. |
| */ |
| public HFileSystem(FileSystem fs) { |
| this.fs = fs; |
| this.noChecksumFs = fs; |
| this.useHBaseChecksum = false; |
| } |
| |
| /** |
| * Returns the filesystem that is specially setup for |
| * doing reads from storage. This object avoids doing |
| * checksum verifications for reads. |
| * @return The FileSystem object that can be used to read data |
| * from files. |
| */ |
| public FileSystem getNoChecksumFs() { |
| return noChecksumFs; |
| } |
| |
| /** |
| * Returns the underlying filesystem |
| * @return The underlying FileSystem for this FilterFileSystem object. |
| */ |
| public FileSystem getBackingFs() throws IOException { |
| return fs; |
| } |
| |
| /** |
| * Set the source path (directory/file) to the specified storage policy. |
| * @param path The source path (directory/file). |
| * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. |
| * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g |
| * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. |
| */ |
| public void setStoragePolicy(Path path, String policyName) { |
| FSUtils.setStoragePolicy(this.fs, path, policyName); |
| } |
| |
| /** |
| * Get the storage policy of the source path (directory/file). |
| * @param path The source path (directory/file). |
| * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or |
| * exception thrown when trying to get policy |
| */ |
| @Nullable |
| public String getStoragePolicyName(Path path) { |
| try { |
| Object blockStoragePolicySpi = |
| ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); |
| return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); |
| } catch (Exception e) { |
| // Maybe fail because of using old HDFS version, try the old way |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Failed to get policy directly", e); |
| } |
| return getStoragePolicyForOldHDFSVersion(path); |
| } |
| } |
| |
| /** |
| * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need |
| * to keep compatible with it. See HADOOP-12161 for more details. |
| * @param path Path to get storage policy against |
| * @return the storage policy name |
| */ |
| private String getStoragePolicyForOldHDFSVersion(Path path) { |
| try { |
| if (this.fs instanceof DistributedFileSystem) { |
| DistributedFileSystem dfs = (DistributedFileSystem) this.fs; |
| HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); |
| if (null != status) { |
| if (unspecifiedStoragePolicyId < 0) { |
| // Get the unspecified id field through reflection to avoid compilation error. |
| // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to |
| // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED |
| Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); |
| unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); |
| } |
| byte storagePolicyId = status.getStoragePolicy(); |
| if (storagePolicyId != unspecifiedStoragePolicyId) { |
| BlockStoragePolicy[] policies = dfs.getStoragePolicies(); |
| for (BlockStoragePolicy policy : policies) { |
| if (policy.getId() == storagePolicyId) { |
| return policy.getName(); |
| } |
| } |
| } |
| } |
| } |
| } catch (Throwable e) { |
| LOG.warn("failed to get block storage policy of [" + path + "]", e); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Are we verifying checksums in HBase? |
| * @return True, if hbase is configured to verify checksums, |
| * otherwise false. |
| */ |
| public boolean useHBaseChecksum() { |
| return useHBaseChecksum; |
| } |
| |
| /** |
| * Close this filesystem object |
| */ |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| if (this.noChecksumFs != fs) { |
| this.noChecksumFs.close(); |
| } |
| } |
| |
| /** |
| * Returns a brand new instance of the FileSystem. It does not use |
| * the FileSystem.Cache. In newer versions of HDFS, we can directly |
| * invoke FileSystem.newInstance(Configuration). |
| * |
| * @param conf Configuration |
| * @return A new instance of the filesystem |
| */ |
| private static FileSystem newInstanceFileSystem(Configuration conf) |
| throws IOException { |
| URI uri = FileSystem.getDefaultUri(conf); |
| FileSystem fs = null; |
| Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); |
| if (clazz != null) { |
| // This will be true for Hadoop 1.0, or 0.20. |
| fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); |
| fs.initialize(uri, conf); |
| } else { |
| // For Hadoop 2.0, we have to go through FileSystem for the filesystem |
| // implementation to be loaded by the service loader in case it has not |
| // been loaded yet. |
| Configuration clone = new Configuration(conf); |
| clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true); |
| fs = FileSystem.get(uri, clone); |
| } |
| if (fs == null) { |
| throw new IOException("No FileSystem for scheme: " + uri.getScheme()); |
| } |
| |
| return fs; |
| } |
| |
| /** |
| * Returns an instance of Filesystem wrapped into the class specified in |
| * hbase.fs.wrapper property, if one is set in the configuration, returns |
| * unmodified FS instance passed in as an argument otherwise. |
| * @param base Filesystem instance to wrap |
| * @param conf Configuration |
| * @return wrapped instance of FS, or the same instance if no wrapping configured. |
| */ |
| private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) { |
| try { |
| Class<?> clazz = conf.getClass("hbase.fs.wrapper", null); |
| if (clazz != null) { |
| return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class) |
| .newInstance(base, conf); |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to wrap filesystem: " + e); |
| } |
| return base; |
| } |
| |
| public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException { |
| return addLocationsOrderInterceptor(conf, new ReorderWALBlocks()); |
| } |
| |
| /** |
| * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient |
| * linked to this FileSystem. See HBASE-6435 for the background. |
| * <p/> |
| * There should be no reason, except testing, to create a specific ReorderBlocks. |
| * |
| * @return true if the interceptor was added, false otherwise. |
| */ |
| static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) { |
| if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default |
| LOG.debug("addLocationsOrderInterceptor configured to false"); |
| return false; |
| } |
| |
| FileSystem fs; |
| try { |
| fs = FileSystem.get(conf); |
| } catch (IOException e) { |
| LOG.warn("Can't get the file system from the conf.", e); |
| return false; |
| } |
| |
| if (!(fs instanceof DistributedFileSystem)) { |
| LOG.debug("The file system is not a DistributedFileSystem. " + |
| "Skipping on block location reordering"); |
| return false; |
| } |
| |
| DistributedFileSystem dfs = (DistributedFileSystem) fs; |
| DFSClient dfsc = dfs.getClient(); |
| if (dfsc == null) { |
| LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " + |
| "block reordering interceptor. Continuing, but this is unexpected." |
| ); |
| return false; |
| } |
| |
| try { |
| Field nf = DFSClient.class.getDeclaredField("namenode"); |
| nf.setAccessible(true); |
| Field modifiersField = Field.class.getDeclaredField("modifiers"); |
| modifiersField.setAccessible(true); |
| modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL); |
| |
| ClientProtocol namenode = (ClientProtocol) nf.get(dfsc); |
| if (namenode == null) { |
| LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" + |
| " reordering interceptor. Continuing, but this is unexpected." |
| ); |
| return false; |
| } |
| |
| ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf); |
| nf.set(dfsc, cp1); |
| LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" + |
| " using class " + lrb.getClass().getName()); |
| } catch (NoSuchFieldException e) { |
| LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); |
| return false; |
| } catch (IllegalAccessException e) { |
| LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| private static ClientProtocol createReorderingProxy(final ClientProtocol cp, |
| final ReorderBlocks lrb, final Configuration conf) { |
| return (ClientProtocol) Proxy.newProxyInstance |
| (cp.getClass().getClassLoader(), |
| new Class[]{ClientProtocol.class, Closeable.class}, |
| new InvocationHandler() { |
| public Object invoke(Object proxy, Method method, |
| Object[] args) throws Throwable { |
| try { |
| if ((args == null || args.length == 0) |
| && "close".equals(method.getName())) { |
| RPC.stopProxy(cp); |
| return null; |
| } else { |
| Object res = method.invoke(cp, args); |
| if (res != null && args != null && args.length == 3 |
| && "getBlockLocations".equals(method.getName()) |
| && res instanceof LocatedBlocks |
| && args[0] instanceof String |
| && args[0] != null) { |
| lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]); |
| } |
| return res; |
| } |
| } catch (InvocationTargetException ite) { |
| // We will have this for all the exception, checked on not, sent |
| // by any layer, including the functional exception |
| Throwable cause = ite.getCause(); |
| if (cause == null){ |
| throw new RuntimeException( |
| "Proxy invocation failed and getCause is null", ite); |
| } |
| if (cause instanceof UndeclaredThrowableException) { |
| Throwable causeCause = cause.getCause(); |
| if (causeCause == null) { |
| throw new RuntimeException("UndeclaredThrowableException had null cause!"); |
| } |
| cause = cause.getCause(); |
| } |
| throw cause; |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Interface to implement to add a specific reordering logic in hdfs. |
| */ |
| interface ReorderBlocks { |
| /** |
| * |
| * @param conf - the conf to use |
| * @param lbs - the LocatedBlocks to reorder |
| * @param src - the file name currently read |
| * @throws IOException - if something went wrong |
| */ |
| void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException; |
| } |
| |
| /** |
| * We're putting at lowest priority the wal files blocks that are on the same datanode |
| * as the original regionserver which created these files. This because we fear that the |
| * datanode is actually dead, so if we use it it will timeout. |
| */ |
| static class ReorderWALBlocks implements ReorderBlocks { |
| public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) |
| throws IOException { |
| |
| ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src); |
| if (sn == null) { |
| // It's not an WAL |
| return; |
| } |
| |
| // Ok, so it's an WAL |
| String hostName = sn.getHostname(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(src + |
| " is an WAL file, so reordering blocks, last hostname will be:" + hostName); |
| } |
| |
| // Just check for all blocks |
| for (LocatedBlock lb : lbs.getLocatedBlocks()) { |
| DatanodeInfo[] dnis = lb.getLocations(); |
| if (dnis != null && dnis.length > 1) { |
| boolean found = false; |
| for (int i = 0; i < dnis.length - 1 && !found; i++) { |
| if (hostName.equals(dnis[i].getHostName())) { |
| // advance the other locations by one and put this one at the last place. |
| DatanodeInfo toLast = dnis[i]; |
| System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); |
| dnis[dnis.length - 1] = toLast; |
| found = true; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create a new HFileSystem object, similar to FileSystem.get(). |
| * This returns a filesystem object that avoids checksum |
| * verification in the filesystem for hfileblock-reads. |
| * For these blocks, checksum verification is done by HBase. |
| */ |
| static public FileSystem get(Configuration conf) throws IOException { |
| return new HFileSystem(conf, true); |
| } |
| |
| /** |
| * Wrap a LocalFileSystem within a HFileSystem. |
| */ |
| static public FileSystem getLocalFs(Configuration conf) throws IOException { |
| return new HFileSystem(FileSystem.getLocal(conf)); |
| } |
| |
| /** |
| * The org.apache.hadoop.fs.FilterFileSystem does not yet support |
| * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop, |
| * this definition will go away. |
| */ |
| @SuppressWarnings("deprecation") |
| public FSDataOutputStream createNonRecursive(Path f, |
| boolean overwrite, |
| int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| return fs.createNonRecursive(f, overwrite, bufferSize, replication, |
| blockSize, progress); |
| } |
| } |