blob: 4d3f1f1ec085e14b0d688eeb7df40b3bd42e3f96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
* particular WALProvider we use to handle wal requests. Configure which provider gets used with the
* configuration setting "hbase.wal.provider". Available implementations:
* <ul>
* <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
* "asyncfs"</li>
* <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
* FileSystem interface via an asynchronous client.</li>
* <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
* FileSystem interface via HDFS's synchronous DFSClient.</li>
* <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
* server.</li>
* </ul>
* Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
*/
@InterfaceAudience.Private
public class WALFactory {
/**
* Used in tests for injecting customized stream reader implementation, for example, inject fault
* when reading, etc.
* <p/>
* After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
* will also determine whether the WAL file is encrypted and we should use
* {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
* header of the WAL file, so we do not need to specify a specical reader to read the WAL file
* either.
* <p/>
* So typically you should not use this config in production.
*/
public static final String WAL_STREAM_READER_CLASS_IMPL =
"hbase.regionserver.wal.stream.reader.impl";
private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
/**
* Maps between configuration names for providers and implementation classes.
*/
enum Providers {
defaultProvider(AsyncFSWALProvider.class),
filesystem(FSHLogProvider.class),
multiwal(RegionGroupingProvider.class),
asyncfs(AsyncFSWALProvider.class);
final Class<? extends WALProvider> clazz;
Providers(Class<? extends WALProvider> clazz) {
this.clazz = clazz;
}
}
public static final String WAL_PROVIDER = "hbase.wal.provider";
static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
static final String REPLICATION_WAL_PROVIDER_ID = "rep";
final String factoryId;
final Abortable abortable;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
// regionserver holds meta regions, then this ref will be non-null.
// lazily intialized; most RegionServers don't deal with META
private final LazyInitializedWALProvider metaProvider;
// This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
// generate a lot useless data, see HBASE-27775 for more details.
private final LazyInitializedWALProvider replicationProvider;
/**
* Configuration-specified WAL Reader used when a custom reader is requested
*/
private final Class<? extends WALStreamReader> walStreamReaderClass;
/**
* How long to attempt opening in-recovery wals
*/
private final int timeoutMillis;
private final Configuration conf;
private final ExcludeDatanodeManager excludeDatanodeManager;
// Used for the singleton WALFactory, see below.
private WALFactory(Configuration conf) {
// this code is duplicated here so we can keep our members final.
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
ProtobufWALStreamReader.class, WALStreamReader.class);
Preconditions.checkArgument(
AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
"The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
AbstractFSWALProvider.Initializer.class.getName());
this.conf = conf;
// end required early initialization
// this instance can't create wals, just reader/writers.
provider = null;
factoryId = SINGLETON_ID;
this.abortable = null;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.metaProvider = null;
this.replicationProvider = null;
}
Providers getDefaultProvider() {
return Providers.defaultProvider;
}
Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
try {
Providers provider = Providers.valueOf(conf.get(key, defaultValue));
// AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
// the default and we can't use it, we want to fall back to FSHLog which we know works on
// all versions.
if (
provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
&& !AsyncFSWALProvider.load()
) {
// AsyncFSWAL has better performance in most cases, and also uses less resources, we will
// try to use it if possible. It deeply hacks into the internal of DFSClient so will be
// easily broken when upgrading hadoop.
LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
return FSHLogProvider.class;
}
// N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
// support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
// not fall back to FSHLogProvider.
return provider.clazz;
} catch (IllegalArgumentException exception) {
// Fall back to them specifying a class name
// Note that the passed default class shouldn't actually be used, since the above only fails
// when there is a config value present.
return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
}
}
static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
LOG.info("Instantiating WALProvider of type {}", clazz);
try {
return clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", e);
throw new IOException("couldn't set up WALProvider", e);
}
}
/**
* Create a WALFactory.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java")
public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, null);
}
/**
* Create a WALFactory.
* <p/>
* This is the constructor you should use when creating a WALFactory in normal code, to make sure
* that the {@code factoryId} is the server name. We need this assumption in some places for
* parsing the server name out from the wal file name.
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
* @param serverName use to generate the factoryId, which will be append at the first of the final
* file name
* @param abortable the server associated with this WAL file
*/
public WALFactory(Configuration conf, ServerName serverName, Abortable abortable)
throws IOException {
this(conf, serverName.toString(), abortable);
}
/**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
* to make a directory
* @param abortable the server associated with this WAL file
*/
private WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
ProtobufWALStreamReader.class, WALStreamReader.class);
Preconditions.checkArgument(
AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
"The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
AbstractFSWALProvider.Initializer.class.getName());
this.conf = conf;
this.factoryId = factoryId;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.abortable = abortable;
this.metaProvider = new LazyInitializedWALProvider(this,
AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
REPLICATION_WAL_PROVIDER, this.abortable);
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
provider.init(this, conf, null, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
this.provider = provider;
} else {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
provider = new DisabledWALProvider();
provider.init(this, conf, factoryId, null);
}
}
public Configuration getConf() {
return conf;
}
/**
* Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
* replay and edits that have gone to any wals from this factory.
*/
public void close() throws IOException {
List<IOException> ioes = new ArrayList<>();
// these fields could be null if the WALFactory is created only for being used in the
// getInstance method.
if (metaProvider != null) {
try {
metaProvider.close();
} catch (IOException e) {
ioes.add(e);
}
}
if (replicationProvider != null) {
try {
replicationProvider.close();
} catch (IOException e) {
ioes.add(e);
}
}
if (provider != null) {
try {
provider.close();
} catch (IOException e) {
ioes.add(e);
}
}
if (!ioes.isEmpty()) {
IOException ioe = new IOException("Failed to close WALFactory");
for (IOException e : ioes) {
ioe.addSuppressed(e);
}
throw ioe;
}
}
/**
* Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
* are not ending cleanly and will need to replay edits from this factory's wals, use this method
* if you can as it will try to leave things as tidy as possible.
*/
public void shutdown() throws IOException {
List<IOException> ioes = new ArrayList<>();
// these fields could be null if the WALFactory is created only for being used in the
// getInstance method.
if (metaProvider != null) {
try {
metaProvider.shutdown();
} catch (IOException e) {
ioes.add(e);
}
}
if (replicationProvider != null) {
try {
replicationProvider.shutdown();
} catch (IOException e) {
ioes.add(e);
}
}
if (provider != null) {
try {
provider.shutdown();
} catch (IOException e) {
ioes.add(e);
}
}
if (!ioes.isEmpty()) {
IOException ioe = new IOException("Failed to shutdown WALFactory");
for (IOException e : ioes) {
ioe.addSuppressed(e);
}
throw ioe;
}
}
public List<WAL> getWALs() {
return provider.getWALs();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
WALProvider getMetaProvider() throws IOException {
return metaProvider.getProvider();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
WALProvider getReplicationProvider() throws IOException {
return replicationProvider.getProvider();
}
/**
* @param region the region which we want to get a WAL for. Could be null.
*/
public WAL getWAL(RegionInfo region) throws IOException {
// Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
if (region.isMetaRegion()) {
return metaProvider.getProvider().getWAL(region);
} else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
return replicationProvider.getProvider().getWAL(region);
}
}
return provider.getWAL(region);
}
public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
return createStreamReader(fs, path, (CancelableProgressable) null);
}
/**
* Create a one-way stream reader for the WAL.
* @return A WAL reader. Close when done with it.
*/
public WALStreamReader createStreamReader(FileSystem fs, Path path,
CancelableProgressable reporter) throws IOException {
return createStreamReader(fs, path, reporter, -1);
}
/**
* Create a one-way stream reader for the WAL, and start reading from the given
* {@code startPosition}.
* @return A WAL reader. Close when done with it.
*/
public WALStreamReader createStreamReader(FileSystem fs, Path path,
CancelableProgressable reporter, long startPosition) throws IOException {
try {
// A wal file could be under recovery, so it may take several
// tries to get it open. Instead of claiming it is corrupted, retry
// to open it up to 5 minutes by default.
long startWaiting = EnvironmentEdgeManager.currentTime();
long openTimeout = timeoutMillis + startWaiting;
int nbAttempt = 0;
WALStreamReader reader = null;
while (true) {
try {
reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
return reader;
} catch (Exception e) {
// catch Exception so that we close reader for all exceptions. If we don't
// close the reader, we leak a socket.
if (reader != null) {
reader.close();
}
// Only inspect the Exception to consider retry when it's an IOException
if (e instanceof IOException) {
String msg = e.getMessage();
if (
msg != null && (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block")
|| msg.matches("Blocklist for [^ ]* has changed.*"))
) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
throw new LeaseNotRecoveredException(e);
}
}
// Rethrow the original exception if we are not retrying due to HDFS-isms.
throw e;
}
}
} catch (IOException ie) {
throw ie;
} catch (Exception e) {
throw new IOException("Cannot get log reader", e);
}
}
/**
* Create a writer for the WAL. Uses defaults.
* <p>
* Should be package-private. public only for tests and
* {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
* @return A WAL writer. Close when done with it.
*/
public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
return FSHLogProvider.createWriter(conf, fs, path, false);
}
/**
* Should be package-private, visible for recovery testing. Uses defaults.
* @return an overwritable writer for recovered edits. caller should close.
*/
public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
throws IOException {
return FSHLogProvider.createWriter(conf, fs, path, true);
}
// These static methods are currently used where it's impractical to
// untangle the reliance on state in the filesystem. They rely on singleton
// WALFactory that just provides Reader / Writers.
// For now, first Configuration object wins. Practically this just impacts the reader/writer class
private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
private static final String SINGLETON_ID = WALFactory.class.getName();
// Public only for FSHLog
public static WALFactory getInstance(Configuration configuration) {
WALFactory factory = singleton.get();
if (null == factory) {
WALFactory temp = new WALFactory(configuration);
if (singleton.compareAndSet(null, temp)) {
factory = temp;
} else {
// someone else beat us to initializing
try {
temp.close();
} catch (IOException exception) {
LOG.debug("failed to close temporary singleton. ignoring.", exception);
}
factory = singleton.get();
}
}
return factory;
}
/**
* Create a tailing reader for the given path. Mainly used in replication.
*/
public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
long startPosition) throws IOException {
ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
reader.init(fs, path, conf, startPosition);
return reader;
}
/**
* Create a one-way stream reader for a given path.
*/
public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
throws IOException {
return createStreamReader(fs, path, conf, -1);
}
/**
* Create a one-way stream reader for a given path.
*/
public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
long startPosition) throws IOException {
return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
startPosition);
}
/**
* If you already have a WALFactory, you should favor the instance method. Uses defaults.
* @return a Writer that will overwrite files. Caller must close.
*/
static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
final Configuration configuration) throws IOException {
return FSHLogProvider.createWriter(configuration, fs, path, true);
}
/**
* If you already have a WALFactory, you should favor the instance method. Uses defaults.
* @return a writer that won't overwrite files. Caller must close.
*/
public static Writer createWALWriter(final FileSystem fs, final Path path,
final Configuration configuration) throws IOException {
return FSHLogProvider.createWriter(configuration, fs, path, false);
}
public WALProvider getWALProvider() {
return this.provider;
}
/**
* Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
* for hbase:replication.
*/
public List<WALProvider> getAllWALProviders() {
List<WALProvider> providers = new ArrayList<>();
if (provider != null) {
providers.add(provider);
}
WALProvider meta = metaProvider.getProviderNoCreate();
if (meta != null) {
providers.add(meta);
}
WALProvider replication = replicationProvider.getProviderNoCreate();
if (replication != null) {
providers.add(replication);
}
return providers;
}
public ExcludeDatanodeManager getExcludeDatanodeManager() {
return excludeDatanodeManager;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public String getFactoryId() {
return this.factoryId;
}
}