blob: 833efbefb56092fcfb6c03cae3e38315b910d50f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.backup;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.io.FileUtils.listFiles;
import static org.apache.commons.io.filefilter.DirectoryFileFilter.DIRECTORY;
import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
import static org.apache.geode.distributed.ConfigurationProperties.DEPLOY_WORKING_DIR;
import static org.apache.geode.internal.util.TransformUtils.getFileNameTransformer;
import static org.apache.geode.internal.util.TransformUtils.transform;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.toArray;
import static org.apache.geode.test.dunit.internal.DUnitLauncher.getDistributedSystemProperties;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.FileFilter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.admin.internal.AdminDistributedSystemImpl;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.BackupStatus;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
/**
* Distributed tests for incremental backup.
*/
@SuppressWarnings("serial")
public class IncrementalBackupDistributedTest implements Serializable {
private static final int DATA_INCREMENT = 10_000;
private static final IOFileFilter OPLOG_FILTER = new RegexFileFilter(".*\\.[kdc]rf$");
private static volatile BackupMembershipListener backupMembershipListener;
private int dataStart;
private int dataEnd = dataStart + DATA_INCREMENT;
private String diskStoreName1;
private String diskStoreName2;
private String regionName1;
private String regionName2;
private File baselineDir;
private File incrementalDir1;
private File userDirInVM0;
private File userDirInVM1;
private VM vm0;
private VM vm1;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public CacheRule cacheRule = new CacheRule();
@Rule
public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
@Rule
public SerializableTestName testName = new SerializableTestName();
@Before
public void setUp() throws Exception {
vm0 = getVM(0);
vm1 = getVM(1);
String uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
diskStoreName1 = uniqueName + "_diskStore-1";
diskStoreName2 = uniqueName + "_diskStore-2";
regionName1 = uniqueName + "_region-1";
regionName2 = uniqueName + "_region-2";
baselineDir = temporaryFolder.newFolder("baseline");
incrementalDir1 = temporaryFolder.newFolder("incremental");
userDirInVM0 = temporaryFolder.newFolder("vm0");
userDirInVM1 = temporaryFolder.newFolder("vm1");
File userDirInController = temporaryFolder.newFolder("controller");
vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0), userDirInVM0));
vm1.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm1), userDirInVM1));
createCache(diskDirRule.getDiskDirFor(getController()), userDirInController);
performPuts();
}
@After
public void tearDown() throws Exception {
for (VM vm : toArray(vm0, vm1, getController())) {
vm.invoke(() -> {
backupMembershipListener = null;
});
}
}
/**
* Successful if a member performs a full backup when its backup data is not present in the
* performBackupBaseline (for whatever reason). This also tests what happens when a member is
* offline during the performBackupBaseline backup.
*
* <p>
* The test is regarded as successful when all of the missing members oplog files are backed up
* during an performBackupIncremental backup. This means that the member performed a full backup
* because its oplogs were missing in the performBackupBaseline.
*/
@Test
public void testMissingMemberInBaseline() {
vm1.invoke(() -> installNewBackupMembershipListener());
// Simulate the missing member by forcing a persistent member to go offline.
PersistentID missingMember = vm0.invoke(() -> getPersistentID(diskStoreName1));
vm0.invoke(() -> getCache().close());
vm1.invoke(() -> {
await().atMost(30, SECONDS).untilAsserted(() -> {
assertThat(getMissingPersistentMembers()).contains(missingMember);
assertThat(backupMembershipListener.hasMemberDeparted()).isTrue();
});
});
// Perform performBackupBaseline and make sure that list of offline disk stores contains our
// missing member.
vm1.invoke(() -> {
BackupStatus baselineStatus = performBackup(getBaselineDir().getAbsolutePath());
validateBackupStatus(baselineStatus);
assertThat(baselineStatus.getOfflineDiskStores()).hasSize(2);
});
// Find all of the member's oplogs in the missing member's diskstore directory structure
// (*.crf,*.krf,*.drf)
Collection<File> missingMemberOplogFiles =
listFiles(new File(missingMember.getDirectory()), OPLOG_FILTER, DIRECTORY);
assertThat(missingMemberOplogFiles).isNotEmpty();
// Restart our missing member and make sure it is back online and part of the cluster
vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0), userDirInVM0));
// After reconnecting make sure the other members agree that the missing member is back online.
await().atMost(30, SECONDS).untilAsserted(() -> {
assertThat(getMissingPersistentMembers()).doesNotContain(missingMember);
});
// Perform performBackupIncremental and make sure we have no offline disk stores.
vm1.invoke(() -> {
BackupStatus incrementalStatus =
performBackup(getIncrementalDir().getAbsolutePath(), getBaselineBackupPath());
validateBackupStatus(incrementalStatus);
assertThat(incrementalStatus.getOfflineDiskStores()).isNotNull().isEmpty();
});
// Get the missing member's member id which is different from the PersistentID
String memberId = vm0.invoke(() -> getModifiedMemberId());
// Get list of backed up oplog files in the performBackupIncremental backup for the missing
// member
File incrementalMemberDir = getBackupDirForMember(getIncrementalDir(), memberId);
Collection<File> backupOplogFiles = listFiles(incrementalMemberDir, OPLOG_FILTER, DIRECTORY);
assertThat(backupOplogFiles).isNotEmpty();
// Transform missing member oplogs to just their file names.
Collection<String> missingMemberOplogNames = new ArrayList<>();
transform(missingMemberOplogFiles, missingMemberOplogNames, getFileNameTransformer());
// Transform missing member's performBackupIncremental backup oplogs to just their file names.
List<String> backupOplogNames = new ArrayList<>();
transform(backupOplogFiles, backupOplogNames, getFileNameTransformer());
// Make sure that the performBackupIncremental backup for the missing member contains all of the
// operation logs for that member. This proves that a full backup was performed for that member.
assertThat(backupOplogNames).containsAll(missingMemberOplogNames);
}
private void createCache(final File diskDir, final File userDir) {
CacheFactory cacheFactory = new CacheFactory(getDistributedSystemProperties())
.set(DEPLOY_WORKING_DIR, userDir.getAbsolutePath());
cacheRule.getOrCreateCache(cacheFactory);
createDiskStore(diskStoreName1, diskDir);
createDiskStore(diskStoreName2, diskDir);
createRegion(regionName1, diskStoreName1);
createRegion(regionName2, diskStoreName2);
}
private void createDiskStore(final String diskStoreName, final File diskDir) {
DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
diskStoreFactory.setDiskDirs(new File[] {diskDir});
diskStoreFactory.create(diskStoreName);
}
private void createRegion(final String regionName, final String diskStoreName) {
PartitionAttributesFactory<Integer, String> partitionAttributesFactory =
new PartitionAttributesFactory<>();
partitionAttributesFactory.setTotalNumBuckets(5);
RegionFactory<Integer, String> regionFactory =
getCache().createRegionFactory(PARTITION_PERSISTENT);
regionFactory.setDiskStoreName(diskStoreName);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
regionFactory.create(regionName);
}
private InternalCache getCache() {
return cacheRule.getCache();
}
private File getBaselineDir() {
return baselineDir;
}
private File getIncrementalDir() {
return incrementalDir1;
}
private Set<PersistentID> getMissingPersistentMembers() {
return AdminDistributedSystemImpl
.getMissingPersistentMembers(getCache().getDistributionManager());
}
private BackupStatus performBackup(final String targetDirPath) {
return performBackup(targetDirPath, null);
}
private BackupStatus performBackup(final String targetDirPath, final String baselineDirPath) {
return new BackupOperation(getCache().getDistributionManager(), getCache())
.backupAllMembers(targetDirPath, baselineDirPath);
}
private String getModifiedMemberId() {
return getCache().getDistributedSystem().getDistributedMember().toString()
.replaceAll("[^\\w]+", "_");
}
private PersistentID getPersistentID(final String diskStoreName) {
for (DiskStore diskStore : getCache().listDiskStores()) {
if (diskStore.getName().equals(diskStoreName)) {
return ((DiskStoreImpl) diskStore).getPersistentID();
}
}
throw new AssertionError("Failed to find disk store " + diskStoreName);
}
private String getBaselineBackupPath() {
File[] dirs = getBaselineDir().listFiles((FileFilter) DIRECTORY);
assertThat(dirs).hasSize(1);
return dirs[0].getAbsolutePath();
}
private File getBackupDirForMember(final File rootDir, final CharSequence memberId) {
File[] dateDirs = rootDir.listFiles((FileFilter) DIRECTORY);
assertThat(dateDirs).hasSize(1);
File[] memberDirs =
dateDirs[0].listFiles(file -> file.isDirectory() && file.getName().contains(memberId));
assertThat(memberDirs).hasSize(1);
return memberDirs[0];
}
private void performPuts() {
Region<Integer, String> region = getCache().getRegion(regionName1);
// Fill our region data
for (int i = dataStart; i < dataEnd; ++i) {
region.put(i, Integer.toString(i));
}
Region<Integer, String> barRegion = getCache().getRegion(regionName2);
// Fill our region data
for (int i = dataStart; i < dataEnd; ++i) {
barRegion.put(i, Integer.toString(i));
}
dataStart += DATA_INCREMENT;
dataEnd += DATA_INCREMENT;
}
private void validateBackupStatus(final BackupStatus backupStatus) {
Map<DistributedMember, Set<PersistentID>> backupMap = backupStatus.getBackedUpDiskStores();
assertThat(backupMap).isNotEmpty();
for (DistributedMember member : backupMap.keySet()) {
assertThat(backupMap.get(member)).isNotEmpty();
for (PersistentID id : backupMap.get(member)) {
assertThat(id.getHost()).isNotNull();
assertThat(id.getUUID()).isNotNull();
assertThat(id.getDirectory()).isNotNull();
}
}
}
private void installNewBackupMembershipListener() {
if (backupMembershipListener != null) {
getCache().getDistributionManager().removeMembershipListener(backupMembershipListener);
}
backupMembershipListener = new BackupMembershipListener();
getCache().getDistributionManager().addMembershipListener(backupMembershipListener);
}
private static class BackupMembershipListener implements MembershipListener {
private final AtomicBoolean memberDeparted = new AtomicBoolean();
@Override
public void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
memberDeparted.set(true);
}
boolean hasMemberDeparted() {
return memberDeparted.get();
}
}
}