HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
Pass WALFactory to Replication instead of WALProvider. WALFactory has all
WALProviders in it, not just the user-space WALProvider. Do this so
ReplicationService has access to all WALProviders in the Server (To be
exploited by the follow-on patch in HBASE-25055)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f14da2f..8abede5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1911,8 +1911,7 @@
throw new IOException("Can not create wal directory " + logDir);
}
// Instantiate replication if replication enabled. Pass it the log directories.
- createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
- factory.getWALProvider());
+ createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
}
this.walFactory = factory;
}
@@ -3063,7 +3062,7 @@
* Load the replication executorService objects, if any
*/
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
- FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
+ FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
@@ -3076,19 +3075,19 @@
// only one object.
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
- ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+ ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
} else {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
- ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+ ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
- ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+ ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
}
}
private static <T extends ReplicationService> T newReplicationInstance(String classname,
Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
- Path oldLogDir, WALProvider walProvider) throws IOException {
+ Path oldLogDir, WALFactory walFactory) throws IOException {
final Class<? extends T> clazz;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -3097,7 +3096,7 @@
throw new IOException("Could not find class for " + classname);
}
T service = ReflectionUtils.newInstance(clazz, conf);
- service.initialize(server, walFs, logDir, oldLogDir, walProvider);
+ service.initialize(server, walFs, logDir, oldLogDir, walFactory);
return service;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index e9bbaea..33b3321 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -32,14 +32,11 @@
*/
@InterfaceAudience.Private
public interface ReplicationService {
-
/**
* Initializes the replication service object.
- * @param walProvider can be null if not initialized inside a live region server environment, for
- * example, {@code ReplicationSyncUp}.
*/
- void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
- throws IOException;
+ void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
+ throws IOException;
/**
* Start replication services.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 195877b..d8a696c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
@@ -89,7 +90,7 @@
@Override
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
- WALProvider walProvider) throws IOException {
+ WALFactory walFactory) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.isReplicationForBulkLoadDataEnabled =
@@ -128,6 +129,7 @@
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+ WALProvider walProvider = walFactory.getWALProvider();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
@@ -198,7 +200,6 @@
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
* directory required for replicating hfiles
* @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
- * @throws IOException
*/
@Override
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@@ -211,7 +212,6 @@
/**
* If replication is enabled and this cluster is a master,
* it starts
- * @throws IOException
*/
@Override
public void startReplicationService() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 98490f1..b04c7eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -82,7 +83,8 @@
System.out.println("Start Replication Server start");
Replication replication = new Replication();
- replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
+ replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
+ new WALFactory(conf, "test", false));
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init().get();
while (manager.activeFailoverTaskCount() > 0) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 6e1692a..455b272 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8e38114..4abb00f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -194,7 +194,8 @@
logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
replication = new Replication();
- replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
+ replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
+ new WALFactory(conf, "test", false));
managerOfCluster = getManagerFromCluster();
if (managerOfCluster != null) {
// After replication procedure, we need to add peer by hand (other than by receiving