blob: f6bfe1729818e00d9af7d480bd9e7bfaca58b7d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.extensions;
import junit.framework.Assert;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.EntityParserFactory;
import org.apache.falcon.entity.parser.ProcessEntityParser;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties;
import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
import org.apache.falcon.extensions.store.AbstractTestExtensionStore;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.List;
import java.util.Properties;
/**
* Tests for Extension.
*/
public class ExtensionTest extends AbstractTestExtensionStore {
private static final String PRIMARY_CLUSTER_XML = "/primary-cluster-0.1.xml";
private static final String BACKUP_CLUSTER_XML = "/backup-cluster-0.1.xml";
private static final String JOB_NAME = "hdfs-mirroring-monthly";
private static final String JOB_CLUSTER_NAME = "primaryCluster";
private static final String VALIDITY_START = "2016-01-02T00:00Z";
private static final String VALIDITY_END = "2018-01-02T00:00Z";
private static final String FREQUENCY = "days(1)";
private static final String SOURCEDIR = "/users/source/file1";
private static final String SOURCE_CLUSTER = "primaryCluster";
private static final String TARGETDIR = "/users/target/file1";
private static final String TARGET_CLUSTER = "backupCluster";
private static final String NN_URI = "hdfs://localhost:54314";
private static final String RETENTION_POLICY = "delete";
private static final String RETENTION_AGE = "mins(5)";
private static final String RETENTION_NUM = "7";
private static final String TARGET_KERBEROS_PRINCIPAL = "nn/backup@REALM";
private Extension extension;
private MiniDFSCluster miniDFSCluster;
private DistributedFileSystem miniDfs;
private File baseDir;
private static Properties getCommonProperties() {
Properties properties = new Properties();
properties.setProperty(ExtensionProperties.JOB_NAME.getName(),
JOB_NAME);
properties.setProperty(ExtensionProperties.CLUSTER_NAME.getName(),
JOB_CLUSTER_NAME);
properties.setProperty(ExtensionProperties.VALIDITY_START.getName(),
VALIDITY_START);
properties.setProperty(ExtensionProperties.VALIDITY_END.getName(),
VALIDITY_END);
properties.setProperty(ExtensionProperties.FREQUENCY.getName(),
FREQUENCY);
return properties;
}
private static InputStream getHdfsConfigStream() throws IOException {
Properties properties = getCommonProperties();
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
SOURCEDIR);
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(),
SOURCE_CLUSTER);
properties.setProperty(HdfsMirroringExtensionProperties.TARGET_DIR.getName(),
TARGETDIR);
properties.setProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName(),
TARGET_CLUSTER);
return getConfigStream(properties, "target/hdfsconfig.properties");
}
private static InputStream getHdfsSnapshotExtensionConfigStream() throws IOException {
Properties properties = getCommonProperties();
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
SOURCEDIR);
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName(),
SOURCE_CLUSTER);
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
RETENTION_POLICY);
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
RETENTION_AGE);
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(),
RETENTION_NUM);
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(),
NN_URI);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
TARGETDIR);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName(),
TARGET_CLUSTER);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
RETENTION_POLICY);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
RETENTION_AGE);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(),
RETENTION_NUM);
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
NN_URI);
properties.setProperty(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(),
"5");
properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
"100");
properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
"false");
return getConfigStream(properties, "target/HdfsSnapshotMirror.properties");
}
private static InputStream getConfigStream(Properties properties, String pathName) throws IOException {
File file = new File(pathName);
file.delete();
OutputStream outputStream = new FileOutputStream(pathName);
properties.store(outputStream, null);
outputStream.close();
return new FileInputStream(pathName);
}
@BeforeClass
public void init() throws Exception {
initExtensionStore();
extension = new Extension();
baseDir = Files.createTempDirectory("test_extensions_hdfs").toFile().getAbsoluteFile();
miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.EXTENSION_TEST_PORT, baseDir);
initClusters();
miniDfs = miniDFSCluster.getFileSystem();
miniDfs.mkdirs(new Path(SOURCEDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
miniDfs.mkdirs(new Path(TARGETDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
private void initClusters() throws Exception {
InputStream inputStream = getClass().getResourceAsStream(PRIMARY_CLUSTER_XML);
Cluster primaryCluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
ConfigurationStore.get().publish(EntityType.CLUSTER, primaryCluster);
inputStream = getClass().getResourceAsStream(BACKUP_CLUSTER_XML);
Cluster backupCluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
ConfigurationStore.get().publish(EntityType.CLUSTER, backupCluster);
}
@Test
public void testGetExtensionEntitiesForHdfsMirroring() throws FalconException, IOException {
ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
List<Entity> entities = extension.getEntities(new HdfsMirroringExtension().getName(), getHdfsConfigStream());
if (entities == null || entities.isEmpty()) {
Assert.fail("Entities returned cannot be null or empty");
}
Assert.assertEquals(1, entities.size());
Entity entity = entities.get(0);
Assert.assertEquals(EntityType.PROCESS, entity.getEntityType());
parser.parse(new ByteArrayInputStream(entity.toString().getBytes()));
// Validate
Process processEntity = (Process) entity;
Assert.assertEquals(JOB_NAME, processEntity.getName());
org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters().
getClusters().get(0);
Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName());
Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart()));
Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd()));
Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString());
Assert.assertEquals("UTC", processEntity.getTimezone().getID());
Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine());
Assert.assertEquals(extensionStorePath + "/hdfs-mirroring/libs",
processEntity.getWorkflow().getLib());
Assert.assertEquals(extensionStorePath
+ "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
processEntity.getWorkflow().getPath());
Properties props = EntityUtil.getEntityProperties(processEntity);
String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(ClusterHelper.getCluster(SOURCE_CLUSTER));
Assert.assertEquals(srcClusterEndPoint + SOURCEDIR, props.getProperty("sourceDir"));
Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
String tgtClusterEndPoint = ClusterHelper.getStorageUrl(ClusterHelper.getCluster(TARGET_CLUSTER));
Assert.assertEquals(tgtClusterEndPoint + TARGETDIR, props.getProperty("targetDir"));
Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
//retry
Assert.assertEquals(3, processEntity.getRetry().getAttempts());
Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy());
Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString());
}
@Test(expectedExceptions = FalconException.class,
expectedExceptionsMessageRegExp = "Missing extension property: jobClusterName")
public void testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties()
throws FalconException, IOException {
Properties props = new Properties();
props.load(getHdfsConfigStream());
props.remove(ExtensionProperties.CLUSTER_NAME.getName());
extension.getEntities(new HdfsMirroringExtension().getName(),
getConfigStream(props, "target/HdfsMirroringMissingMandatory.properties"));
}
@Test
public void testGetExtensionEntitiesForHdfsSnapshotMirroring() throws Exception {
ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
miniDfs.allowSnapshot(new Path(SOURCEDIR));
miniDfs.allowSnapshot(new Path(TARGETDIR));
List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
getHdfsSnapshotExtensionConfigStream());
if (entities == null || entities.isEmpty()) {
Assert.fail("Entities returned cannot be null or empty");
}
Assert.assertEquals(1, entities.size());
Entity entity = entities.get(0);
Assert.assertEquals(EntityType.PROCESS, entity.getEntityType());
parser.parse(new ByteArrayInputStream(entity.toString().getBytes()));
// Validate
Process processEntity = (Process) entity;
Assert.assertEquals(JOB_NAME, processEntity.getName());
org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters().
getClusters().get(0);
Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName());
Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart()));
Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd()));
Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString());
Assert.assertEquals("UTC", processEntity.getTimezone().getID());
Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine());
Assert.assertEquals(extensionStorePath + "/hdfs-snapshot-mirroring/libs",
processEntity.getWorkflow().getLib());
Assert.assertEquals(extensionStorePath
+ "/hdfs-snapshot-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml",
processEntity.getWorkflow().getPath());
Properties props = EntityUtil.getEntityProperties(processEntity);
Assert.assertEquals(SOURCEDIR, props.getProperty("sourceSnapshotDir"));
Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
Assert.assertEquals(TARGETDIR, props.getProperty("targetSnapshotDir"));
Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
Assert.assertEquals(JOB_NAME, props.getProperty("snapshotJobName"));
Assert.assertEquals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL,
props.getProperty("sourceNNKerberosPrincipal"));
Assert.assertEquals(TARGET_KERBEROS_PRINCIPAL, props.getProperty("targetNNKerberosPrincipal"));
//retry
Assert.assertEquals(3, processEntity.getRetry().getAttempts());
Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy());
Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString());
}
@Test(dependsOnMethods = "testGetExtensionEntitiesForHdfsSnapshotMirroring",
expectedExceptions = FalconException.class,
expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not allow snapshots.")
public void testHdfsSnapshotMirroringNonSnapshotableDir() throws Exception {
miniDfs.disallowSnapshot(new Path(SOURCEDIR));
List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
getHdfsSnapshotExtensionConfigStream());
if (entities == null || entities.isEmpty()) {
Assert.fail("Entities returned cannot be null or empty");
}
}
@Test(expectedExceptions = FalconException.class,
expectedExceptionsMessageRegExp = "Missing extension property: sourceCluster")
public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties()
throws FalconException, IOException {
Properties props = new Properties();
props.load(getHdfsSnapshotExtensionConfigStream());
props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
getConfigStream(props, "target/HdfsSnapshotMirroringMissing.propertes"));
}
@Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir",
expectedExceptions = FalconException.class,
expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not exist.")
public void testHdfsSnapshotMirroringNonExistingDir() throws Exception {
if (miniDfs.exists(new Path(SOURCEDIR))) {
miniDfs.delete(new Path(SOURCEDIR), true);
}
List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
getHdfsSnapshotExtensionConfigStream());
if (entities == null || entities.isEmpty()) {
Assert.fail("Entities returned cannot be null or empty");
}
}
@AfterClass
public void cleanup() throws Exception {
MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
}
}