blob: d8a696c7172edf7fca7f6d03c533b526984bdcb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@InterfaceAudience.Private
public class Replication implements ReplicationSourceService, ReplicationSinkService {
private static final Logger LOG =
LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
/** Statistics thread schedule pool */
private ScheduledExecutorService scheduleThreadPool;
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler;
/**
* Empty constructor
*/
public Replication() {
}
@Override
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
WALFactory walFactory) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.isReplicationForBulkLoadDataEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
.setDaemon(true)
.build());
if (this.isReplicationForBulkLoadDataEnabled) {
if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
|| conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+ " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+ " is set to true.");
}
}
try {
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init();
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
UUID clusterId = null;
try {
clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
WALProvider walProvider = walFactory.getWALProvider();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
mapping, globalMetricsSource);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
if (walProvider instanceof SyncReplicationWALProvider) {
SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
peerActionListener = syncWALProvider;
syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
// for sync replication state change, we need to reload the state twice, you can see the
// code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
// to see if any of them are in the middle of the two refreshes, if so, we need to manually
// repeat the action we have done in the first refresh, otherwise when the second refresh
// comes we will be in trouble, such as NPE.
replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
.filter(p -> p.getPeerConfig().isSyncReplication())
.filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
.forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
}
}
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler =
new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
}
@Override
public PeerProcedureHandler getPeerProcedureHandler() {
return peerProcedureHandler;
}
/**
* Stops replication service.
*/
@Override
public void stopReplicationService() {
join();
}
/**
* Join with the replication threads
*/
public void join() {
this.replicationManager.join();
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
scheduleThreadPool.shutdown();
}
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
* contain the Cells we are replicating; they are passed here on the side in this
* CellScanner).
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
* directory required for replicating hfiles
* @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
*/
@Override
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
/**
* If replication is enabled and this cluster is a master,
* it starts
*/
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
LOG.info("{} started", this.server.toString());
}
/**
* Get the replication sources manager
* @return the manager if replication is enabled, else returns false
*/
public ReplicationSourceManager getReplicationManager() {
return this.replicationManager;
}
void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
} catch (IOException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
throw e;
}
}
/**
* Statistics task. Periodically prints the cache statistics to the log.
*/
private final static class ReplicationStatisticsTask implements Runnable {
private final ReplicationSink replicationSink;
private final ReplicationSourceManager replicationManager;
public ReplicationStatisticsTask(ReplicationSink replicationSink,
ReplicationSourceManager replicationManager) {
this.replicationManager = replicationManager;
this.replicationSink = replicationSink;
}
@Override
public void run() {
printStats(this.replicationManager.getStats());
printStats(this.replicationSink.getStats());
}
private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (this.replicationLoad == null) {
return null;
}
// always build for latest data
buildReplicationLoad();
return this.replicationLoad;
}
private void buildReplicationLoad() {
List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(this.replicationManager.getSources());
allSources.addAll(this.replicationManager.getOldSources());
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
}
@Override
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider;
}
@Override
public ReplicationPeers getReplicationPeers() {
return replicationPeers;
}
}