| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test.replication; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.OutputStream; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.client.security.tokens.PasswordToken; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.iterators.LongCombiner.Type; |
| import org.apache.accumulo.core.iterators.user.SummingCombiner; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.security.TablePermission; |
| import org.apache.accumulo.minicluster.ServerType; |
| import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; |
| import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; |
| import org.apache.accumulo.miniclusterImpl.ProcessReference; |
| import org.apache.accumulo.miniclusterImpl.ZooKeeperBindException; |
| import org.apache.accumulo.server.replication.ReplicaSystemFactory; |
| import org.apache.accumulo.test.categories.MiniClusterOnlyTests; |
| import org.apache.accumulo.test.functional.ConfigurableMacBase; |
| import org.apache.accumulo.tserver.TabletServer; |
| import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.junit.rules.Timeout; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Iterables; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| @Ignore("Replication ITs are not stable and not currently maintained") |
| @Category(MiniClusterOnlyTests.class) |
| public class CyclicReplicationIT { |
| private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class); |
| |
| @Rule |
| public Timeout getTimeout() { |
| int scalingFactor = 1; |
| try { |
| scalingFactor = Integer.parseInt(System.getProperty("timeout.factor")); |
| } catch (NumberFormatException exception) { |
| log.warn("Could not parse timeout.factor, not scaling timeout"); |
| } |
| |
| return new Timeout(scalingFactor * 10, TimeUnit.MINUTES); |
| } |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") |
| private File createTestDir(String name) { |
| File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests"); |
| assertTrue(baseDir.mkdirs() || baseDir.isDirectory()); |
| File testDir = |
| new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name); |
| FileUtils.deleteQuietly(testDir); |
| assertTrue(testDir.mkdir()); |
| return testDir; |
| } |
| |
| private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception { |
| File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); |
| if (csFile.exists()) |
| throw new RuntimeException(csFile + " already exist"); |
| |
| Configuration coreSite = new Configuration(false); |
| coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); |
| OutputStream out = new BufferedOutputStream( |
| new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml"))); |
| coreSite.writeXml(out); |
| out.close(); |
| } |
| |
| /** |
| * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the |
| * other MAC used for replication |
| */ |
| private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, |
| MiniAccumuloConfigImpl peerCfg) { |
| // Set the same SSL information from the primary when present |
| Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); |
| if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { |
| Map<String,String> peerSiteConfig = new HashMap<>(); |
| peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); |
| String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); |
| assertNotNull("Keystore Path was null", keystorePath); |
| peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); |
| String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); |
| assertNotNull("Truststore Path was null", truststorePath); |
| peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); |
| |
| // Passwords might be stored in CredentialProvider |
| String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); |
| if (keystorePassword != null) { |
| peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); |
| } |
| String truststorePassword = |
| primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); |
| if (truststorePassword != null) { |
| peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); |
| } |
| |
| System.out.println("Setting site configuration for peer " + peerSiteConfig); |
| peerCfg.setSiteConfig(peerSiteConfig); |
| } |
| |
| // Use the CredentialProvider if the primary also uses one |
| String credProvider = |
| primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); |
| if (credProvider != null) { |
| Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); |
| peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), |
| credProvider); |
| peerCfg.setSiteConfig(peerSiteConfig); |
| } |
| } |
| |
| @Test |
| public void dataIsNotOverReplicated() throws Exception { |
| File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2"); |
| String password = "password"; |
| |
| MiniAccumuloConfigImpl master1Cfg; |
| MiniAccumuloClusterImpl master1Cluster; |
| while (true) { |
| master1Cfg = new MiniAccumuloConfigImpl(master1Dir, password); |
| master1Cfg.setNumTservers(1); |
| master1Cfg.setInstanceName("master1"); |
| |
| // Set up SSL if needed |
| ConfigurableMacBase.configureForEnvironment(master1Cfg, |
| ConfigurableMacBase.getSslDir(master1Dir)); |
| |
| master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName()); |
| master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); |
| master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); |
| master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); |
| master1Cfg.setProperty(Property.MANAGER_REPLICATION_SCAN_INTERVAL, "1s"); |
| master1Cluster = new MiniAccumuloClusterImpl(master1Cfg); |
| setCoreSite(master1Cluster); |
| |
| try { |
| master1Cluster.start(); |
| break; |
| } catch (ZooKeeperBindException e) { |
| log.warn("Failed to start ZooKeeper on {}, will retry", master1Cfg.getZooKeeperPort()); |
| } |
| } |
| |
| MiniAccumuloConfigImpl master2Cfg; |
| MiniAccumuloClusterImpl master2Cluster; |
| while (true) { |
| master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password); |
| master2Cfg.setNumTservers(1); |
| master2Cfg.setInstanceName("master2"); |
| |
| // Set up SSL if needed. Need to share the same SSL truststore as master1 |
| this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg); |
| |
| master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName()); |
| master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); |
| master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); |
| master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); |
| master2Cfg.setProperty(Property.MANAGER_REPLICATION_SCAN_INTERVAL, "1s"); |
| master2Cluster = new MiniAccumuloClusterImpl(master2Cfg); |
| setCoreSite(master2Cluster); |
| |
| try { |
| master2Cluster.start(); |
| break; |
| } catch (ZooKeeperBindException e) { |
| log.warn("Failed to start ZooKeeper on {}, will retry", master2Cfg.getZooKeeperPort()); |
| } |
| } |
| |
| try { |
| AccumuloClient clientMaster1 = |
| master1Cluster.createAccumuloClient("root", new PasswordToken(password)), |
| clientMaster2 = master2Cluster.createAccumuloClient("root", new PasswordToken(password)); |
| |
| String master1UserName = "master1", master1Password = "foo"; |
| String master2UserName = "master2", master2Password = "bar"; |
| String master1Table = master1Cluster.getInstanceName(), |
| master2Table = master2Cluster.getInstanceName(); |
| |
| clientMaster1.securityOperations().createLocalUser(master1UserName, |
| new PasswordToken(master1Password)); |
| clientMaster2.securityOperations().createLocalUser(master2UserName, |
| new PasswordToken(master2Password)); |
| |
| // Configure the credentials we should use to authenticate ourselves to the peer for |
| // replication |
| clientMaster1.instanceOperations().setProperty( |
| Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), |
| master2UserName); |
| clientMaster1.instanceOperations().setProperty( |
| Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), |
| master2Password); |
| |
| clientMaster2.instanceOperations().setProperty( |
| Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), |
| master1UserName); |
| clientMaster2.instanceOperations().setProperty( |
| Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), |
| master1Password); |
| |
| clientMaster1.instanceOperations().setProperty( |
| Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(), |
| ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, |
| AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), |
| master2Cluster.getZooKeepers()))); |
| |
| clientMaster2.instanceOperations().setProperty( |
| Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(), |
| ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, |
| AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), |
| master1Cluster.getZooKeepers()))); |
| |
| clientMaster1.tableOperations().create(master1Table, |
| new NewTableConfiguration().withoutDefaultIterators()); |
| String master1TableId = clientMaster1.tableOperations().tableIdMap().get(master1Table); |
| assertNotNull(master1TableId); |
| |
| clientMaster2.tableOperations().create(master2Table, |
| new NewTableConfiguration().withoutDefaultIterators()); |
| String master2TableId = clientMaster2.tableOperations().tableIdMap().get(master2Table); |
| assertNotNull(master2TableId); |
| |
| // Replicate master1 in the master1 cluster to master2 in the master2 cluster |
| clientMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), |
| "true"); |
| clientMaster1.tableOperations().setProperty(master1Table, |
| Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), |
| master2TableId); |
| |
| // Replicate master2 in the master2 cluster to master1 in the master2 cluster |
| clientMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), |
| "true"); |
| clientMaster2.tableOperations().setProperty(master2Table, |
| Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), |
| master1TableId); |
| |
| // Give our replication user the ability to write to the respective table |
| clientMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, |
| TablePermission.WRITE); |
| clientMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, |
| TablePermission.WRITE); |
| |
| IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); |
| SummingCombiner.setEncodingType(summingCombiner, Type.STRING); |
| SummingCombiner.setCombineAllColumns(summingCombiner, true); |
| |
| // Set a combiner on both instances that will sum multiple values |
| // We can use this to verify that the mutation was not sent multiple times |
| clientMaster1.tableOperations().attachIterator(master1Table, summingCombiner); |
| clientMaster2.tableOperations().attachIterator(master2Table, summingCombiner); |
| |
| // Write a single entry |
| try (BatchWriter bw = clientMaster1.createBatchWriter(master1Table)) { |
| Mutation m = new Mutation("row"); |
| m.put("count", "", "1"); |
| bw.addMutation(m); |
| } |
| |
| Set<String> files = clientMaster1.replicationOperations().referencedFiles(master1Table); |
| |
| log.info("Found {} that need replication from master1", files); |
| |
| // Kill and restart the tserver to close the WAL on master1 |
| for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { |
| master1Cluster.killProcess(ServerType.TABLET_SERVER, proc); |
| } |
| |
| master1Cluster.exec(TabletServer.class); |
| |
| log.info("Restarted tserver on master1"); |
| |
| // Try to avoid ACCUMULO-2964 |
| Thread.sleep(1000); |
| |
| // Sanity check that the element is there on master1 |
| Entry<Key,Value> entry; |
| try (Scanner s = clientMaster1.createScanner(master1Table, Authorizations.EMPTY)) { |
| entry = Iterables.getOnlyElement(s); |
| assertEquals("1", entry.getValue().toString()); |
| |
| // Wait for this table to replicate |
| clientMaster1.replicationOperations().drain(master1Table, files); |
| |
| Thread.sleep(5000); |
| } |
| |
| // Check that the element made it to master2 only once |
| try (Scanner s = clientMaster2.createScanner(master2Table, Authorizations.EMPTY)) { |
| entry = Iterables.getOnlyElement(s); |
| assertEquals("1", entry.getValue().toString()); |
| |
| // Wait for master2 to finish replicating it back |
| files = clientMaster2.replicationOperations().referencedFiles(master2Table); |
| |
| // Kill and restart the tserver to close the WAL on master2 |
| for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { |
| master2Cluster.killProcess(ServerType.TABLET_SERVER, proc); |
| } |
| |
| master2Cluster.exec(TabletServer.class); |
| |
| // Try to avoid ACCUMULO-2964 |
| Thread.sleep(1000); |
| } |
| |
| // Check that the element made it to master2 only once |
| try (Scanner s = clientMaster2.createScanner(master2Table, Authorizations.EMPTY)) { |
| entry = Iterables.getOnlyElement(s); |
| assertEquals("1", entry.getValue().toString()); |
| |
| clientMaster2.replicationOperations().drain(master2Table, files); |
| |
| Thread.sleep(5000); |
| } |
| |
| // Verify that the entry wasn't sent back to master1 |
| try (Scanner s = clientMaster1.createScanner(master1Table, Authorizations.EMPTY)) { |
| entry = Iterables.getOnlyElement(s); |
| assertEquals("1", entry.getValue().toString()); |
| } |
| } finally { |
| master1Cluster.stop(); |
| master2Cluster.stop(); |
| } |
| } |
| |
| } |