blob: d6a36421cbdaff03661bdcf733acde7896874b9e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A WAL Provider that returns a WAL per group of regions.
*
* This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
* WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
*
* Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
* property "hbase.wal.regiongrouping.strategy". Current strategy choices are
* <ul>
* <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
* "bounded".</li>
* <li><em>identity</em> : each region belongs to its own group.</li>
* <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
* </ul>
* Optionally, a FQCN to a custom implementation may be given.
*/
@InterfaceAudience.Private
public class RegionGroupingProvider implements WALProvider {
private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
/**
* Map identifiers to a group number.
*/
public static interface RegionGroupingStrategy {
String GROUP_NAME_DELIMITER = ".";
/**
* Given an identifier and a namespace, pick a group.
*/
String group(final byte[] identifier, byte[] namespace);
void init(Configuration config, String providerId);
}
/**
* Maps between configuration names for strategies and implementation classes.
*/
static enum Strategies {
defaultStrategy(BoundedGroupingStrategy.class),
identity(IdentityGroupingStrategy.class),
bounded(BoundedGroupingStrategy.class),
namespace(NamespaceGroupingStrategy.class);
final Class<? extends RegionGroupingStrategy> clazz;
Strategies(Class<? extends RegionGroupingStrategy> clazz) {
this.clazz = clazz;
}
}
/**
* instantiate a strategy from a config property.
* requires conf to have already been set (as well as anything the provider might need to read).
*/
RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
final String defaultValue) throws IOException {
Class<? extends RegionGroupingStrategy> clazz;
try {
clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
} catch (IllegalArgumentException exception) {
// Fall back to them specifying a class name
// Note that the passed default class shouldn't actually be used, since the above only fails
// when there is a config value present.
clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
}
LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
try {
final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
result.init(conf, providerId);
return result;
} catch (Exception e) {
LOG.error("couldn't set up region grouping strategy, check config key " +
REGION_GROUPING_STRATEGY);
LOG.debug("Exception details for failure to load region grouping strategy.", e);
throw new IOException("couldn't set up region grouping strategy", e);
}
}
public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
/** delegate provider for WAL creation/roll/close */
public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
.name();
private static final String META_WAL_GROUP_NAME = "meta";
/** A group-provider mapping, make sure one-one rather than many-one mapping */
private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
private final KeyLocker<String> createLock = new KeyLocker<>();
private RegionGroupingStrategy strategy;
private WALFactory factory;
private Configuration conf;
private List<WALActionsListener> listeners = new ArrayList<>();
private String providerId;
private Class<? extends WALProvider> providerClass;
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
if (null != strategy) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
this.conf = conf;
this.factory = factory;
if (META_WAL_PROVIDER_ID.equals(providerId)) {
// do not change the provider id if it is for meta
this.providerId = providerId;
} else {
StringBuilder sb = new StringBuilder().append(factory.factoryId);
if (providerId != null) {
if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
sb.append(providerId);
} else {
sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
}
}
this.providerId = sb.toString();
}
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
}
private WALProvider createProvider(String group) throws IOException {
WALProvider provider = WALFactory.createProvider(providerClass);
provider.init(factory, conf,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group);
provider.addWALActionsListener(new MetricsWAL());
return provider;
}
@Override
public List<WAL> getWALs() {
return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
}
private WAL getWAL(String group) throws IOException {
WALProvider provider = cached.get(group);
if (provider == null) {
Lock lock = createLock.acquireLock(group);
try {
provider = cached.get(group);
if (provider == null) {
provider = createProvider(group);
listeners.forEach(provider::addWALActionsListener);
cached.put(group, provider);
}
} finally {
lock.unlock();
}
}
return provider.getWAL(null);
}
@Override
public WAL getWAL(RegionInfo region) throws IOException {
String group;
if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
group = META_WAL_GROUP_NAME;
} else {
byte[] id;
byte[] namespace;
if (region != null) {
id = region.getEncodedNameAsBytes();
namespace = region.getTable().getNamespace();
} else {
id = HConstants.EMPTY_BYTE_ARRAY;
namespace = null;
}
group = strategy.group(id, namespace);
}
return getWAL(group);
}
@Override
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (WALProvider provider: cached.values()) {
try {
provider.shutdown();
} catch (IOException e) {
LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
}
failure = e;
}
}
if (failure != null) {
throw failure;
}
}
@Override
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (WALProvider provider : cached.values()) {
try {
provider.close();
} catch (IOException e) {
LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
}
failure = e;
}
}
if (failure != null) {
throw failure;
}
}
static class IdentityGroupingStrategy implements RegionGroupingStrategy {
@Override
public void init(Configuration config, String providerId) {}
@Override
public String group(final byte[] identifier, final byte[] namespace) {
return Bytes.toString(identifier);
}
}
@Override
public long getNumLogFiles() {
long numLogFiles = 0;
for (WALProvider provider : cached.values()) {
numLogFiles += provider.getNumLogFiles();
}
return numLogFiles;
}
@Override
public long getLogFileSize() {
long logFileSize = 0;
for (WALProvider provider : cached.values()) {
logFileSize += provider.getLogFileSize();
}
return logFileSize;
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
// Notice that there is an assumption that this method must be called before the getWAL above,
// so we can make sure there is no sub WALProvider yet, so we only add the listener to our
// listeners list without calling addWALActionListener for each WALProvider. Although it is no
// hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
// extra code actually works, then we will have other big problems. So leave it as is.
listeners.add(listener);
}
}