blob: fe7ced59ca9b89159ea53cb34e35e31908d21dd3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.snapshots.replication;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
/**
* Hdfs Snapshot replicator unit tests.
*/
public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator {
private MiniDFSCluster miniDFSCluster;
private DistributedFileSystem miniDfs;
private File baseDir;
private Cluster sourceCluster;
private Cluster targetCluster;
private String sourceStorageUrl;
private String targetStorageUrl;
private Path sourceDir = new Path("/apps/falcon/snapshot-replication/sourceDir/");
private Path targetDir = new Path("/apps/falcon/snapshot-replication/targetDir/");
private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
private String[] args = {"--" + HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1",
"--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100",
"--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), "hdfs://localhost:54136",
"--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), "localhost:8021",
"--" + HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), "localhost:8021",
"--" + HdfsSnapshotMirrorProperties.TARGET_NN.getName(), "hdfs://localhost:54136",
"--" + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
"/apps/falcon/snapshot-replication/sourceDir/",
"--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
"/apps/falcon/snapshot-replication/targetDir/",
"--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false",
"--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), "snapshotJobName", };
@BeforeClass
public void init() throws Exception {
this.setConf(new Configuration());
baseDir = Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile();
miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, baseDir);
miniDfs = miniDFSCluster.getFileSystem();
sourceCluster = initCluster("/primary-cluster-0.1.xml");
targetCluster = initCluster("/backup-cluster-0.1.xml");
miniDfs.mkdirs(sourceDir, fsPermission);
miniDfs.mkdirs(targetDir, fsPermission);
miniDfs.allowSnapshot(sourceDir);
miniDfs.allowSnapshot(targetDir);
cmd = getCommand(args);
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()), "1");
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()), "100");
}
private Cluster initCluster(String clusterName) throws Exception {
InputStream inputStream = getClass().getResourceAsStream(clusterName);
Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
return cluster;
}
@Test
public void replicationTest() throws Exception {
sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd,
new Configuration(getConf()));
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd,
new Configuration(getConf()));
// create dir1, create snapshot, invoke copy, check file in target, create snapshot on target
Path dir1 = new Path(sourceDir, "dir1");
miniDfs.mkdir(dir1, fsPermission);
miniDfs.createSnapshot(sourceDir, "snapshot1");
invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
sourceDir.toString(), targetDir.toString(), "snapshot1");
miniDfs.createSnapshot(targetDir, "snapshot1");
Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
// create dir2, create snapshot, invoke copy, check dir in target, create snapshot on target
Path dir2 = new Path(sourceDir, "dir2");
miniDfs.mkdir(dir2, fsPermission);
miniDfs.createSnapshot(sourceDir, "snapshot2");
invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
sourceDir.toString(), targetDir.toString(), "snapshot2");
miniDfs.createSnapshot(targetDir, "snapshot2");
Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
// delete dir1, create snapshot, invoke copy, check file not in target
miniDfs.delete(dir1, true);
miniDfs.createSnapshot(sourceDir, "snapshot3");
invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
sourceDir.toString(), targetDir.toString(), "snapshot3");
miniDfs.createSnapshot(targetDir, "snapshot3");
Assert.assertFalse(miniDfs.exists(new Path(targetDir, "dir1")));
Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
}
@Test(dependsOnMethods = "replicationTest",
expectedExceptions = FalconException.class,
expectedExceptionsMessageRegExp = "Unable to find latest snapshot on targetDir "
+ "/apps/falcon/snapshot-replication/targetDir")
public void removeSnapshotabilityOnTargetTest() throws Exception {
// remove snapshotability on target, create snapshot on source, invoke copy should fail
miniDfs.deleteSnapshot(targetDir, "snapshot1");
miniDfs.deleteSnapshot(targetDir, "snapshot2");
miniDfs.deleteSnapshot(targetDir, "snapshot3");
miniDfs.disallowSnapshot(targetDir);
Path dir1 = new Path(sourceDir, "dir4");
miniDfs.mkdir(dir1, fsPermission);
miniDfs.createSnapshot(sourceDir, "snapshot4");
invokeCopy(sourceStorageUrl, targetStorageUrl, miniDfs, miniDfs,
sourceDir.toString(), targetDir.toString(), "snapshot4");
}
@AfterClass
public void cleanup() throws Exception {
MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
}
}