blob: 03cd86bfd9a461930c1ae870888b0adcd1b2f9a2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@InterfaceAudience.Private
public class Replication extends WALActionsListener.Base implements
ReplicationSourceService, ReplicationSinkService {
private static final Log LOG =
LogFactory.getLog(Replication.class);
private boolean replication;
private boolean replicationForBulkLoadData;
private ReplicationSourceManager replicationManager;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
// Hosting server
private Server server;
/** Statistics thread schedule pool */
private ScheduledExecutorService scheduleThreadPool;
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
* @param fs handle to the filesystem
* @param logDir
* @param oldLogDir directory where logs are archived
* @throws IOException
*/
public Replication(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException{
initialize(server, fs, logDir, oldLogDir);
}
/**
* Empty constructor
*/
public Replication() {
}
public void initialize(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
.setDaemon(true)
.build());
if (this.replicationForBulkLoadData) {
if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
|| conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+ " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+ " is set to true.");
}
}
if (replication) {
try {
this.replicationQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
this.replicationQueues.init(this.server.getServerName().toString());
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
this.replicationPeers.init();
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
this.conf, this.server, this.server);
} catch (ReplicationException e) {
throw new IOException("Failed replication handler create", e);
}
UUID clusterId = null;
try {
clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
this.replicationManager =
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
conf, this.server, fs, logDir, oldLogDir, clusterId);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
} else {
this.replicationManager = null;
this.replicationQueues = null;
this.replicationPeers = null;
this.replicationTracker = null;
this.replicationLoad = null;
}
}
/**
* @param c Configuration to look at
* @return True if replication is enabled.
*/
public static boolean isReplication(final Configuration c) {
return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
}
/**
* @param c Configuration to look at
* @return True if replication for bulk load data is enabled.
*/
public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/*
* Returns an object to listen to new wal changes
**/
public WALActionsListener getWALActionsListener() {
return this;
}
/**
* Stops replication service.
*/
public void stopReplicationService() {
join();
}
/**
* Join with the replication threads
*/
public void join() {
if (this.replication) {
this.replicationManager.join();
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
}
scheduleThreadPool.shutdown();
}
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
* contain the Cells we are replicating; they are passed here on the side in this
* CellScanner).
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
* directory required for replicating hfiles
* @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
if (this.replication) {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
}
/**
* If replication is enabled and this cluster is a master,
* it starts
* @throws IOException
*/
public void startReplicationService() throws IOException {
if (this.replication) {
try {
this.replicationManager.init();
} catch (ReplicationException e) {
throw new IOException(e);
}
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
}
}
/**
* Get the replication sources manager
* @return the manager if replication is enabled, else returns false
*/
public ReplicationSourceManager getReplicationManager() {
return this.replicationManager;
}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
throws IOException {
scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
}
private static boolean hasReplication(Collection<HColumnDescriptor> families) {
for (HColumnDescriptor col : families) {
if (col.getScope() != REPLICATION_SCOPE_LOCAL) {
return true;
}
}
return false;
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @param replicationManager Manager used to add bulk load events hfile references
* @throws IOException If failed to parse the WALEdit
*/
public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
Collection<HColumnDescriptor> families = htd.getFamilies();
boolean hasReplication = hasReplication(families);
for (Cell cell : logEdit.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
} else {
// Skip the flush/compaction/region events
continue;
}
} else if (hasReplication) {
byte[] family = CellUtil.cloneFamily(cell);
// Unexpected, has a tendency to happen in unit tests
assert htd.getFamily(family) != null;
if (!scopes.containsKey(family)) {
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL) {
scopes.put(family, scope);
}
}
}
}
if (!scopes.isEmpty()) {
logKey.setScopes(scopes);
}
}
private static void scopeBulkLoadEdits(HTableDescriptor htd,
ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
TableName tableName, Cell cell) throws IOException {
byte[] family;
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
for (StoreDescriptor s : bld.getStoresList()) {
family = s.getFamilyName().toByteArray();
if (!scopes.containsKey(family)) {
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL) {
scopes.put(family, scope);
}
}
}
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
}
}
void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
} catch (ReplicationException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
throw new IOException(e);
}
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
getReplicationManager().preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
getReplicationManager().postLogRoll(newPath);
}
/**
* This method modifies the master's configuration in order to inject replication-related features
* @param conf
*/
public static void decorateMasterConfiguration(Configuration conf) {
if (!isReplication(conf)) {
return;
}
String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
if (!plugins.contains(cleanerClass)) {
conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
}
if (isReplicationForBulkLoadDataEnabled(conf)) {
plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
if (!plugins.contains(cleanerClass)) {
conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
}
}
}
/**
* This method modifies the region server's configuration in order to inject replication-related
* features
* @param conf region server configurations
*/
public static void decorateRegionServerConfiguration(Configuration conf) {
if (isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
if (!plugins.contains(rsCoprocessorClass)) {
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, plugins + ","
+ rsCoprocessorClass);
}
}
}
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class ReplicationStatisticsThread extends Thread {
private final ReplicationSink replicationSink;
private final ReplicationSourceManager replicationManager;
public ReplicationStatisticsThread(final ReplicationSink replicationSink,
final ReplicationSourceManager replicationManager) {
super("ReplicationStatisticsThread");
this.replicationManager = replicationManager;
this.replicationSink = replicationSink;
}
@Override
public void run() {
printStats(this.replicationManager.getStats());
printStats(this.replicationSink.getStats());
}
private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (this.replicationLoad == null) {
return null;
}
// always build for latest data
buildReplicationLoad();
return this.replicationLoad;
}
private void buildReplicationLoad() {
List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
sourceMetricsList.add(source.getSourceMetrics());
}
// get old source
List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
for (ReplicationSourceInterface source : oldSources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
}
}