| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.wal; |
| |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Abortable; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; |
| import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; |
| import org.apache.hadoop.hbase.util.CancelableProgressable; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; |
| import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.apache.yetus.audience.InterfaceStability; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| |
| /** |
| * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By |
| * default, this implementation picks a directory in Hadoop FS based on a combination of |
| * <ul> |
| * <li>the HBase root directory |
| * <li>HConstants.HREGION_LOGDIR_NAME |
| * <li>the given factory's factoryId (usually identifying the regionserver by host:port) |
| * </ul> |
| * It also uses the providerId to differentiate among files. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class); |
| |
| /** Separate old log into different dir by regionserver name **/ |
| public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver"; |
| public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false; |
| |
| // Only public so classes back in regionserver.wal can access |
| public interface Reader extends WAL.Reader { |
| /** |
| * @param fs File system. |
| * @param path Path. |
| * @param c Configuration. |
| * @param s Input stream that may have been pre-opened by the caller; may be null. |
| */ |
| void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; |
| } |
| |
| protected volatile T wal; |
| protected WALFactory factory; |
| protected Configuration conf; |
| protected List<WALActionsListener> listeners = new ArrayList<>(); |
| protected String providerId; |
| protected AtomicBoolean initialized = new AtomicBoolean(false); |
| // for default wal provider, logPrefix won't change |
| protected String logPrefix; |
| protected Abortable abortable; |
| |
| /** |
| * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs |
| * missing the newly created WAL, see HBASE-21503 for more details. |
| */ |
| private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock(); |
| |
| /** |
| * @param factory factory that made us, identity used for FS layout. may not be null |
| * @param conf may not be null |
| * @param providerId differentiate between providers from one factory, used for FS layout. may be |
| * null |
| */ |
| @Override |
| public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) |
| throws IOException { |
| if (!initialized.compareAndSet(false, true)) { |
| throw new IllegalStateException("WALProvider.init should only be called once."); |
| } |
| this.factory = factory; |
| this.conf = conf; |
| this.providerId = providerId; |
| // get log prefix |
| StringBuilder sb = new StringBuilder().append(factory.factoryId); |
| if (providerId != null) { |
| if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { |
| sb.append(providerId); |
| } else { |
| sb.append(WAL_FILE_NAME_DELIMITER).append(providerId); |
| } |
| } |
| logPrefix = sb.toString(); |
| this.abortable = abortable; |
| doInit(conf); |
| } |
| |
| @Override |
| public List<WAL> getWALs() { |
| if (wal != null) { |
| return Lists.newArrayList(wal); |
| } |
| walCreateLock.readLock().lock(); |
| try { |
| if (wal == null) { |
| return Collections.emptyList(); |
| } else { |
| return Lists.newArrayList(wal); |
| } |
| } finally { |
| walCreateLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public T getWAL(RegionInfo region) throws IOException { |
| T walCopy = wal; |
| if (walCopy != null) { |
| return walCopy; |
| } |
| walCreateLock.writeLock().lock(); |
| try { |
| walCopy = wal; |
| if (walCopy != null) { |
| return walCopy; |
| } |
| walCopy = createWAL(); |
| boolean succ = false; |
| try { |
| walCopy.init(); |
| succ = true; |
| } finally { |
| if (!succ) { |
| walCopy.close(); |
| } |
| } |
| wal = walCopy; |
| return walCopy; |
| } finally { |
| walCreateLock.writeLock().unlock(); |
| } |
| } |
| |
| protected abstract T createWAL() throws IOException; |
| |
| protected abstract void doInit(Configuration conf) throws IOException; |
| |
| @Override |
| public void shutdown() throws IOException { |
| T log = this.wal; |
| if (log != null) { |
| log.shutdown(); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| T log = this.wal; |
| if (log != null) { |
| log.close(); |
| } |
| } |
| |
| /** |
| * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the |
| * number of files (rolled and active). if either of them aren't, count 0 for that provider. |
| */ |
| @Override |
| public long getNumLogFiles() { |
| T log = this.wal; |
| return log == null ? 0 : log.getNumLogFiles(); |
| } |
| |
| /** |
| * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the |
| * size of files (only rolled). if either of them aren't, count 0 for that provider. |
| */ |
| @Override |
| public long getLogFileSize() { |
| T log = this.wal; |
| return log == null ? 0 : log.getLogFileSize(); |
| } |
| |
| /** |
| * returns the number of rolled WAL files. |
| */ |
| public static int getNumRolledLogFiles(WAL wal) { |
| return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles(); |
| } |
| |
| /** |
| * returns the size of rolled WAL files. |
| */ |
| public static long getLogFileSize(WAL wal) { |
| return ((AbstractFSWAL<?>) wal).getLogFileSize(); |
| } |
| |
| /** |
| * return the current filename from the current wal. |
| */ |
| public static Path getCurrentFileName(final WAL wal) { |
| return ((AbstractFSWAL<?>) wal).getCurrentFileName(); |
| } |
| |
| /** |
| * request a log roll, but don't actually do it. |
| */ |
| static void requestLogRoll(final WAL wal) { |
| ((AbstractFSWAL<?>) wal).requestLogRoll(); |
| } |
| |
| // should be package private; more visible for use in AbstractFSWAL |
| public static final String WAL_FILE_NAME_DELIMITER = "."; |
| /** The hbase:meta region's WAL filename extension */ |
| public static final String META_WAL_PROVIDER_ID = ".meta"; |
| static final String DEFAULT_PROVIDER_ID = "default"; |
| |
| // Implementation details that currently leak in tests or elsewhere follow |
| /** File Extension used while splitting an WAL into regions (HBASE-2312) */ |
| public static final String SPLITTING_EXT = "-splitting"; |
| |
| /** |
| * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for |
| * description. |
| */ |
| private static final Pattern WAL_FILE_NAME_PATTERN = |
| Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?"); |
| |
| /** |
| * Define for when no timestamp found. |
| */ |
| private static final long NO_TIMESTAMP = -1L; |
| |
| /** |
| * It returns the file create timestamp (the 'FileNum') from the file name. For name format see |
| * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal |
| * @param wal must not be null |
| * @return the file number that is part of the WAL file name |
| */ |
| public static long extractFileNumFromWAL(final WAL wal) { |
| final Path walPath = ((AbstractFSWAL<?>) wal).getCurrentFileName(); |
| if (walPath == null) { |
| throw new IllegalArgumentException("The WAL path couldn't be null"); |
| } |
| String name = walPath.getName(); |
| long timestamp = getTimestamp(name); |
| if (timestamp == NO_TIMESTAMP) { |
| throw new IllegalArgumentException(name + " is not a valid wal file name"); |
| } |
| return timestamp; |
| } |
| |
| /** |
| * A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER} |
| * <file-creation-timestamp>[.<suffix>]. provider-name is usually made up of a |
| * server-name and a provider-id |
| * @param filename name of the file to validate |
| * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise |
| */ |
| public static boolean validateWALFilename(String filename) { |
| return WAL_FILE_NAME_PATTERN.matcher(filename).matches(); |
| } |
| |
| /** |
| * Split a WAL filename to get a start time. WALs usually have the time we start writing to them |
| * with as part of their name, usually the suffix. Sometimes there will be an extra suffix as when |
| * it is a WAL for the meta table. For example, WALs might look like this |
| * <code>10.20.20.171%3A60020.1277499063250</code> where <code>1277499063250</code> is the |
| * timestamp. Could also be a meta WAL which adds a '.meta' suffix or a |
| * synchronous replication WAL which adds a '.syncrep' suffix. Check for these. File also may have |
| * no timestamp on it. For example the recovered.edits files are WALs but are named in ascending |
| * order. Here is an example: 0000000000000016310. Allow for this. |
| * @param name Name of the WAL file. |
| * @return Timestamp or {@link #NO_TIMESTAMP}. |
| */ |
| public static long getTimestamp(String name) { |
| Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); |
| return matcher.matches() ? Long.parseLong(matcher.group(2)): NO_TIMESTAMP; |
| } |
| |
| /** |
| * Construct the directory name for all WALs on a given server. Dir names currently look like this |
| * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>. |
| * @param serverName Server name formatted as described in {@link ServerName} |
| * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if |
| * <code>serverName</code> passed is <code>1.example.org,60030,12345</code> |
| */ |
| public static String getWALDirectoryName(final String serverName) { |
| StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME); |
| dirName.append("/"); |
| dirName.append(serverName); |
| return dirName.toString(); |
| } |
| |
| /** |
| * Construct the directory name for all old WALs on a given server. The default old WALs dir looks |
| * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to |
| * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>. |
| * @param serverName Server name formatted as described in {@link ServerName} |
| * @return the relative WAL directory name |
| */ |
| public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) { |
| StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME); |
| if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { |
| dirName.append(Path.SEPARATOR); |
| dirName.append(serverName); |
| } |
| return dirName.toString(); |
| } |
| |
| /** |
| * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts, |
| * this method ignores the format of the logfile component. Current format: [base directory for |
| * hbase]/hbase/.logs/ServerName/logfile or [base directory for |
| * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and |
| * server-specific directories. |
| * @return null if it's not a log file. Returns the ServerName of the region server that created |
| * this log file otherwise. |
| */ |
| public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path) |
| throws IOException { |
| if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) { |
| return null; |
| } |
| |
| if (conf == null) { |
| throw new IllegalArgumentException("parameter conf must be set"); |
| } |
| |
| final String rootDir = conf.get(HConstants.HBASE_DIR); |
| if (rootDir == null || rootDir.isEmpty()) { |
| throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf."); |
| } |
| |
| final StringBuilder startPathSB = new StringBuilder(rootDir); |
| if (!rootDir.endsWith("/")) { |
| startPathSB.append('/'); |
| } |
| startPathSB.append(HConstants.HREGION_LOGDIR_NAME); |
| if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) { |
| startPathSB.append('/'); |
| } |
| final String startPath = startPathSB.toString(); |
| |
| String fullPath; |
| try { |
| fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString(); |
| } catch (IllegalArgumentException e) { |
| LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage()); |
| return null; |
| } |
| |
| if (!fullPath.startsWith(startPath)) { |
| return null; |
| } |
| |
| final String serverNameAndFile = fullPath.substring(startPath.length()); |
| |
| if (serverNameAndFile.indexOf('/') < "a,0,0".length()) { |
| // Either it's a file (not a directory) or it's not a ServerName format |
| return null; |
| } |
| |
| Path p = new Path(path); |
| return getServerNameFromWALDirectoryName(p); |
| } |
| |
| /** |
| * This function returns region server name from a log file name which is in one of the following |
| * formats: |
| * <ul> |
| * <li>hdfs://<name node>/hbase/.logs/<server name>-splitting/...</li> |
| * <li>hdfs://<name node>/hbase/.logs/<server name>/...</li> |
| * </ul> |
| * @return null if the passed in logFile isn't a valid WAL file path |
| */ |
| public static ServerName getServerNameFromWALDirectoryName(Path logFile) { |
| String logDirName = logFile.getParent().getName(); |
| // We were passed the directory and not a file in it. |
| if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) { |
| logDirName = logFile.getName(); |
| } |
| ServerName serverName = null; |
| if (logDirName.endsWith(SPLITTING_EXT)) { |
| logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length()); |
| } |
| try { |
| serverName = ServerName.parseServerName(logDirName); |
| } catch (IllegalArgumentException | IllegalStateException ex) { |
| serverName = null; |
| LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); |
| } |
| if (serverName != null && serverName.getStartcode() < 0) { |
| LOG.warn("Invalid log file path=" + logFile); |
| serverName = null; |
| } |
| return serverName; |
| } |
| |
| public static boolean isMetaFile(Path p) { |
| return isMetaFile(p.getName()); |
| } |
| |
| /** |
| * @return True if String ends in {@link #META_WAL_PROVIDER_ID} |
| */ |
| public static boolean isMetaFile(String p) { |
| return p != null && p.endsWith(META_WAL_PROVIDER_ID); |
| } |
| |
| /** |
| * Comparator used to compare WAL files together based on their start time. |
| * Just compares start times and nothing else. |
| */ |
| public static class WALStartTimeComparator implements Comparator<Path> { |
| @Override |
| public int compare(Path o1, Path o2) { |
| return Long.compare(getTS(o1), getTS(o2)); |
| } |
| |
| /** |
| * Split a path to get the start time |
| * For example: 10.20.20.171%3A60020.1277499063250 |
| * Could also be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL |
| * which adds a '.syncrep' suffix. Check. |
| * @param p path to split |
| * @return start time |
| */ |
| private static long getTS(Path p) { |
| return getTimestamp(p.getName()); |
| } |
| } |
| |
| |
| |
| public static boolean isArchivedLogFile(Path p) { |
| String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR; |
| return p.toString().contains(oldLog); |
| } |
| |
| /** |
| * Get the archived WAL file path |
| * @param path - active WAL file path |
| * @param conf - configuration |
| * @return archived path if exists, path - otherwise |
| * @throws IOException exception |
| */ |
| public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { |
| Path rootDir = CommonFSUtils.getWALRootDir(conf); |
| Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); |
| if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { |
| ServerName serverName = getServerNameFromWALDirectoryName(path); |
| if (serverName == null) { |
| LOG.error("Couldn't locate log: " + path); |
| return path; |
| } |
| oldLogDir = new Path(oldLogDir, serverName.getServerName()); |
| } |
| Path archivedLogLocation = new Path(oldLogDir, path.getName()); |
| final FileSystem fs = CommonFSUtils.getWALFileSystem(conf); |
| |
| if (fs.exists(archivedLogLocation)) { |
| LOG.info("Log " + path + " was moved to " + archivedLogLocation); |
| return archivedLogLocation; |
| } else { |
| LOG.error("Couldn't locate log: " + path); |
| return path; |
| } |
| } |
| |
| /** |
| * Opens WAL reader with retries and additional exception handling |
| * @param path path to WAL file |
| * @param conf configuration |
| * @return WAL Reader instance |
| */ |
| public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) |
| throws IOException { |
| long retryInterval = 2000; // 2 sec |
| int maxAttempts = 30; |
| int attempt = 0; |
| Exception ee = null; |
| org.apache.hadoop.hbase.wal.WAL.Reader reader = null; |
| while (reader == null && attempt++ < maxAttempts) { |
| try { |
| // Detect if this is a new file, if so get a new reader else |
| // reset the current reader so that we see the new data |
| reader = WALFactory.createReader(path.getFileSystem(conf), path, conf); |
| return reader; |
| } catch (FileNotFoundException fnfe) { |
| // If the log was archived, continue reading from there |
| Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf); |
| if (!Objects.equals(path, archivedLog)) { |
| return openReader(archivedLog, conf); |
| } else { |
| throw fnfe; |
| } |
| } catch (LeaseNotRecoveredException lnre) { |
| // HBASE-15019 the WAL was not closed due to some hiccup. |
| LOG.warn("Try to recover the WAL lease " + path, lnre); |
| recoverLease(conf, path); |
| reader = null; |
| ee = lnre; |
| } catch (NullPointerException npe) { |
| // Workaround for race condition in HDFS-4380 |
| // which throws a NPE if we open a file before any data node has the most recent block |
| // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. |
| LOG.warn("Got NPE opening reader, will retry."); |
| reader = null; |
| ee = npe; |
| } |
| if (reader == null) { |
| // sleep before next attempt |
| try { |
| Thread.sleep(retryInterval); |
| } catch (InterruptedException e) { |
| } |
| } |
| } |
| throw new IOException("Could not open reader", ee); |
| } |
| |
| // For HBASE-15019 |
| private static void recoverLease(final Configuration conf, final Path path) { |
| try { |
| final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf); |
| RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { |
| @Override |
| public boolean progress() { |
| LOG.debug("Still trying to recover WAL lease: " + path); |
| return true; |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn("unable to recover lease for WAL: " + path, e); |
| } |
| } |
| |
| @Override |
| public void addWALActionsListener(WALActionsListener listener) { |
| listeners.add(listener); |
| } |
| |
| private static String getWALNameGroupFromWALName(String name, int group) { |
| Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); |
| if (matcher.matches()) { |
| return matcher.group(group); |
| } else { |
| throw new IllegalArgumentException(name + " is not a valid wal file name"); |
| } |
| } |
| /** |
| * Get prefix of the log from its name, assuming WAL name in format of |
| * log_prefix.filenumber.log_suffix |
| * @param name Name of the WAL to parse |
| * @return prefix of the log |
| * @throws IllegalArgumentException if the name passed in is not a valid wal file name |
| * @see AbstractFSWAL#getCurrentFileName() |
| */ |
| public static String getWALPrefixFromWALName(String name) { |
| return getWALNameGroupFromWALName(name, 1); |
| } |
| } |