blob: 754fb065f7cc3914741ea1ba5d85e76597f75477 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.parser;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Partitions;
import org.apache.falcon.entity.v0.feed.Validity;
import org.apache.falcon.group.FeedGroupMapTest;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.FalconTestUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import static org.testng.AssertJUnit.assertEquals;
/**
* Test Cases for Feed entity parser.
*/
public class FeedEntityParserTest extends AbstractTestBase {
private final FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
.getParser(EntityType.FEED);
private Feed modifiableFeed;
@BeforeMethod
public void setUp() throws Exception {
cleanupStore();
ConfigurationStore store = ConfigurationStore.get();
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller();
Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(CLUSTER_XML));
cluster.setName("testCluster");
store.publish(EntityType.CLUSTER, cluster);
cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(CLUSTER_XML));
cluster.setName("backupCluster");
store.publish(EntityType.CLUSTER, cluster);
CurrentUser.authenticate(FalconTestUtil.TEST_USER_2);
modifiableFeed = parser.parseAndValidate(this.getClass()
.getResourceAsStream(FEED_XML));
}
@Test(expectedExceptions = ValidationException.class)
public void testValidations() throws Exception {
ConfigurationStore.get().remove(EntityType.CLUSTER, "backupCluster");
parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
}
@Test
public void testParse() throws IOException, FalconException, JAXBException {
Feed feed = parser.parseAndValidate(this.getClass()
.getResourceAsStream(FEED_XML));
Assert.assertNotNull(feed);
assertEquals(feed.getName(), "clicks");
assertEquals(feed.getDescription(), "clicks log");
assertEquals(feed.getFrequency().toString(), "hours(1)");
assertEquals(feed.getSla().getSlaHigh().toString(), "hours(3)");
assertEquals(feed.getSla().getSlaLow().toString(), "hours(2)");
assertEquals(feed.getGroups(), "online,bi");
assertEquals(feed.getClusters().getClusters().get(0).getName(),
"testCluster");
assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaLow().toString(), "hours(3)");
assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaHigh().toString(), "hours(4)");
assertEquals(feed.getClusters().getClusters().get(0).getType(),
ClusterType.SOURCE);
assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity()
.getStart()), "2011-11-01T00:00Z");
assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity()
.getEnd()), "2011-12-31T00:00Z");
assertEquals(feed.getTimezone().getID(), "UTC");
assertEquals(feed.getClusters().getClusters().get(0).getRetention()
.getAction(), ActionType.DELETE);
assertEquals(feed.getClusters().getClusters().get(0).getRetention()
.getLimit().toString(), "hours(48)");
assertEquals(feed.getClusters().getClusters().get(1).getName(),
"backupCluster");
assertEquals(feed.getClusters().getClusters().get(1).getType(),
ClusterType.TARGET);
assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(1).getValidity()
.getStart()), "2011-11-01T00:00Z");
assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(1).getValidity()
.getEnd()), "2011-12-31T00:00Z");
assertEquals(feed.getClusters().getClusters().get(1).getRetention()
.getAction(), ActionType.ARCHIVE);
assertEquals(feed.getClusters().getClusters().get(1).getRetention()
.getLimit().toString(), "hours(6)");
assertEquals("${nameNode}/projects/falcon/clicks",
FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
assertEquals("${nameNode}/projects/falcon/clicksMetaData",
FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
assertEquals("${nameNode}/projects/falcon/clicksStats",
FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
assertEquals(feed.getACL().getGroup(), "group");
assertEquals(feed.getACL().getOwner(), FalconTestUtil.TEST_USER_2);
assertEquals(feed.getACL().getPermission(), "0x755");
assertEquals(feed.getSchema().getLocation(), "/schema/clicks");
assertEquals(feed.getSchema().getProvider(), "protobuf");
StringWriter stringWriter = new StringWriter();
Marshaller marshaller = EntityType.FEED.getMarshaller();
marshaller.marshal(feed, stringWriter);
System.out.println(stringWriter.toString());
}
@Test(expectedExceptions = ValidationException.class)
public void applyValidationInvalidFeed() throws Exception {
Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
.getResourceAsStream(FEED_XML));
feed.getClusters().getClusters().get(0).setName("invalid cluster");
parser.validate(feed);
}
@Test
public void testPartitionExpression() throws FalconException {
Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
.getResourceAsStream(FEED_XML));
//When there are more than 1 src clusters, there should be partition expression
org.apache.falcon.entity.v0.feed.Cluster newCluster = new org.apache.falcon.entity.v0.feed.Cluster();
newCluster.setName("newCluster");
newCluster.setType(ClusterType.SOURCE);
newCluster.setPartition("${cluster.colo}");
feed.getClusters().getClusters().add(newCluster);
try {
parser.validate(feed);
Assert.fail("Expected ValidationException");
} catch (ValidationException ignore) {
//ignore
}
//When there are more than 1 src clusters, the partition expression should contain cluster variable
feed.getClusters().getClusters().get(0).setPartition("*");
try {
parser.validate(feed);
Assert.fail("Expected ValidationException");
} catch (ValidationException ignore) {
//ignore
}
//When there are more than 1 target cluster, there should be partition expre
newCluster.setType(ClusterType.TARGET);
try {
parser.validate(feed);
Assert.fail("Expected ValidationException");
} catch (ValidationException ignore) {
//ignore
}
//When there are more than 1 target clusters, the partition expression should contain cluster variable
feed.getClusters().getClusters().get(1).setPartition("*");
try {
parser.validate(feed);
Assert.fail("Expected ValidationException");
} catch (ValidationException ignore) {
//ignore
}
//Number of parts in partition expression < number of partitions defined for feed
feed.getClusters().getClusters().get(1).setPartition("*/*");
try {
parser.validate(feed);
Assert.fail("Expected ValidationException");
} catch (ValidationException ignore) {
//ignore
}
feed.getClusters().getClusters().get(0).setPartition(null);
feed.getClusters().getClusters().get(1).setPartition(null);
feed.getClusters().getClusters().remove(2);
feed.setPartitions(null);
parser.validate(feed);
}
@Test
public void testInvalidClusterValidityTime() {
Validity validity = modifiableFeed.getClusters().getClusters().get(0)
.getValidity();
try {
validity.setStart(SchemaHelper.parseDateUTC("2007-02-29T00:00Z"));
modifiableFeed.getClusters().getClusters().get(0)
.setValidity(validity);
parser.parseAndValidate(marshallEntity(modifiableFeed));
Assert.fail("Cluster validity failed");
} catch (Exception e) {
System.out.println(e.getMessage());
validity.setStart(SchemaHelper.parseDateUTC("2011-11-01T00:00Z"));
modifiableFeed.getClusters().getClusters().get(0)
.setValidity(validity);
}
try {
validity.setEnd(SchemaHelper.parseDateUTC("2010-04-31T00:00Z"));
modifiableFeed.getClusters().getClusters().get(0)
.setValidity(validity);
parser.parseAndValidate(marshallEntity(modifiableFeed));
Assert.fail("Cluster validity failed");
} catch (Exception e) {
System.out.println(e.getMessage());
validity.setEnd(SchemaHelper.parseDateUTC("2011-12-31T00:00Z"));
modifiableFeed.getClusters().getClusters().get(0)
.setValidity(validity);
}
}
@Test(expectedExceptions = ValidationException.class)
public void testInvalidProcessValidity() throws Exception {
Feed feed = parser.parseAndValidate((FeedEntityParserTest.class
.getResourceAsStream(FEED_XML)));
feed.getClusters().getClusters().get(0).getValidity()
.setStart(SchemaHelper.parseDateUTC("2012-11-01T00:00Z"));
parser.validate(feed);
}
@Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = "slaLow of Feed:.*")
public void testInvalidSlaLow() throws Exception {
Feed feed = parser.parseAndValidate((FeedEntityParserTest.class
.getResourceAsStream(FEED_XML)));
feed.getSla().setSlaLow(new Frequency("hours(4)"));
feed.getSla().setSlaHigh(new Frequency("hours(2)"));
parser.validate(feed);
}
@Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = "slaHigh of Feed:.*")
public void testInvalidSlaHigh() throws Exception {
Feed feed = parser.parseAndValidate((FeedEntityParserTest.class
.getResourceAsStream(FEED_XML)));
feed.getSla().setSlaLow(new Frequency("hours(2)"));
feed.getSla().setSlaHigh(new Frequency("hours(10)"));
feed.getClusters().getClusters().get(0).getRetention().setLimit(new Frequency("hours(9)"));
parser.validate(feed);
}
@Test
public void testValidFeedGroup() throws FalconException, JAXBException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
feed1.setName("f1" + System.currentTimeMillis());
feed1.setGroups("group1,group2,group3");
feed1.setLocations(new Locations());
Location location = new Location();
location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad");
location.setType(LocationType.DATA);
feed1.getLocations().getLocations().add(location);
feed1.getClusters().getClusters().get(0).getLocations().getLocations().set(0, location);
parser.parseAndValidate(feed1.toString());
ConfigurationStore.get().publish(EntityType.FEED, feed1);
Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
feed2.setName("f2" + System.currentTimeMillis());
feed2.setGroups("group1,group2,group5");
feed2.setLocations(new Locations());
Location location2 = new Location();
location2
.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad");
location2.setType(LocationType.DATA);
feed2.getLocations().getLocations().add(location2);
feed2.getClusters().getClusters().get(0).getLocations().getLocations().set(0, location);
parser.parseAndValidate(feed2.toString());
}
// TODO Disabled the test since I do not see anything invalid in here.
@Test(enabled = false, expectedExceptions = ValidationException.class)
public void testInvalidFeedClusterDataLocation() throws JAXBException, FalconException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
feed1.setName("f1" + System.currentTimeMillis());
feed1.setGroups("group1,group2,group3");
feed1.setLocations(new Locations());
Location location = new Location();
location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad");
location.setType(LocationType.DATA);
feed1.getLocations().getLocations().add(location);
parser.parseAndValidate(feed1.toString());
}
@Test(expectedExceptions = ValidationException.class)
public void testInvalidFeedGroup() throws FalconException, JAXBException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
feed1.setName("f1" + System.currentTimeMillis());
feed1.setGroups("group1,group2,group3");
feed1.setLocations(new Locations());
Location location = new Location();
location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad");
location.setType(LocationType.DATA);
feed1.getLocations().getLocations().add(location);
parser.parseAndValidate(feed1.toString());
feed1.getClusters().getClusters().get(0).getLocations().getLocations().set(0, location);
ConfigurationStore.get().publish(EntityType.FEED, feed1);
Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
feed2.setName("f2" + System.currentTimeMillis());
feed2.setGroups("group1,group2,group5");
feed2.setLocations(new Locations());
Location location2 = new Location();
location2
.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}/${HOUR}/ad");
location2.setType(LocationType.DATA);
feed2.getLocations().getLocations().add(location2);
feed2.getClusters().getClusters().get(0).getLocations().getLocations().set(0, location);
parser.parseAndValidate(feed2.toString());
}
@Test
public void testValidGroupNames() throws FalconException, JAXBException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed1.setName("f1" + System.currentTimeMillis());
feed1.setGroups("group7,group8");
parser.parseAndValidate(feed1.toString());
feed1.setGroups("group7");
parser.parseAndValidate(feed1.toString());
feed1.setGroups(null);
parser.parseAndValidate(feed1.toString());
ConfigurationStore.get().publish(EntityType.FEED, feed1);
}
@Test
public void testInvalidGroupNames() throws FalconException, JAXBException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed1.setName("f1" + System.currentTimeMillis());
try {
feed1.setGroups("commaend,");
parser.parseAndValidate(feed1.toString());
Assert.fail("Expected exception");
} catch (FalconException ignore) {
//ignore
}
try {
feed1.setGroups("group8, group9");
parser.parseAndValidate(feed1.toString());
Assert.fail("Expected exception");
} catch (FalconException e) {
//ignore
}
try {
feed1.setGroups("space in group,group9");
parser.parseAndValidate(feed1.toString());
Assert.fail("Expected exception");
} catch (FalconException e) {
//ignore
}
}
@Test
public void testClusterPartitionExp() throws FalconException {
Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
"testCluster");
Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.colo}"), "/*/" + cluster.getColo());
Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.name}/Local"), "/*/" + cluster.getName() + "/Local");
Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster,
"/*/${cluster.field1}/Local"), "/*/value1/Local");
}
@Test(expectedExceptions = FalconException.class)
public void testInvalidFeedName() throws JAXBException, FalconException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed1.setName("Feed_name");
parser.parseAndValidate(feed1.toString());
}
@Test(expectedExceptions = FalconException.class)
public void testInvalidFeedGroupName() throws JAXBException, FalconException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed1.setName("feed1");
feed1.getLocations().getLocations().get(0)
.setPath("/data/clicks/${YEAR}/${MONTH}/${DAY}/${HOUR}");
feed1.getClusters().getClusters().get(0).getLocations().getLocations()
.get(0).setPath("/data/clicks/${YEAR}/${MONTH}/${DAY}/${HOUR}");
ConfigurationStore.get().publish(EntityType.FEED, feed1);
Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed2.setName("feed2");
feed2.getLocations().getLocations().get(0).setPath("/data/clicks/${YEAR}/${MONTH}/${DAY}/${HOUR}");
feed2.getClusters().getClusters().get(0).getLocations().getLocations()
.get(0).setPath("/data/clicks/${YEAR}/${MONTH}/${DAY}/${HOUR}");
feed2.setFrequency(new Frequency("hours(1)"));
try {
parser.parseAndValidate(feed2.toString());
} catch (FalconException e) {
e.printStackTrace();
Assert.fail("Not expecting exception for same frequency");
}
feed2.setFrequency(new Frequency("hours(2)"));
//expecting exception
parser.parseAndValidate(feed2.toString());
}
@Test
public void testNullFeedLateArrival() throws JAXBException, FalconException {
Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
FeedGroupMapTest.class
.getResourceAsStream("/config/feed/feed-0.1.xml"));
feed.setLateArrival(null);
parser.parseAndValidate(feed.toString());
}
/**
* A negative test for validating tags key value pair regex: key=value, key=value.
* @throws FalconException
*/
@Test
public void testFeedTags() throws FalconException {
try {
InputStream stream = this.getClass().getResourceAsStream("/config/feed/feed-tags-0.1.xml");
parser.parse(stream);
Assert.fail("org.xml.sax.SAXParseException should have been thrown.");
} catch (FalconException e) {
Assert.assertEquals(javax.xml.bind.UnmarshalException.class, e.getCause().getClass());
Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
}
}
@Test
public void testParseFeedWithTable() throws FalconException {
final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
Feed feedWithTable = parser.parse(inputStream);
Assert.assertEquals(feedWithTable.getTable().getUri(),
"catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
}
@Test (expectedExceptions = FalconException.class)
public void testParseInvalidFeedWithTable() throws FalconException {
parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
}
@Test (expectedExceptions = FalconException.class)
public void testValidateFeedWithTableAndMultipleSources() throws FalconException {
parser.parseAndValidate(FeedEntityParserTest.class.getResourceAsStream(
"/config/feed/table-with-multiple-sources-feed.xml"));
Assert.fail("Should have thrown an exception:Multiple sources are not supported for feed with table storage");
}
@Test(expectedExceptions = ValidationException.class)
public void testValidatePartitionsForTable() throws Exception {
Feed feed = parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/hive-table-feed.xml"));
Assert.assertNull(feed.getPartitions());
Partitions partitions = new Partitions();
Partition partition = new Partition();
partition.setName("colo");
partitions.getPartitions().add(partition);
feed.setPartitions(partitions);
parser.validate(feed);
Assert.fail("An exception should have been thrown:Partitions are not supported for feeds with table storage");
}
@Test(expectedExceptions = ValidationException.class)
public void testValidateClusterHasRegistryWithNoRegistryInterfaceEndPoint() throws Exception {
final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
Feed feedWithTable = parser.parse(inputStream);
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = EntityUtil.getEntity(EntityType.CLUSTER,
feedWithTable.getClusters().getClusters().get(0).getName());
ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).setEndpoint(null);
parser.validate(feedWithTable);
Assert.fail("An exception should have been thrown: Cluster should have registry interface defined with table "
+ "storage");
}
@Test(expectedExceptions = ValidationException.class)
public void testValidateClusterHasRegistryWithNoRegistryInterface() throws Exception {
Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller();
Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(("/config/cluster/cluster-no-registry.xml")));
cluster.setName("badTestCluster");
ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
Feed feedWithTable = parser.parse(inputStream);
Validity validity = modifiableFeed.getClusters().getClusters().get(0)
.getValidity();
feedWithTable.getClusters().getClusters().clear();
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
feedCluster.setValidity(validity);
feedWithTable.getClusters().getClusters().add(feedCluster);
parser.validate(feedWithTable);
Assert.fail("An exception should have been thrown: Cluster should have registry interface defined with table"
+ " storage");
}
@Test(expectedExceptions = ValidationException.class)
public void testValidateOwner() throws Exception {
CurrentUser.authenticate("unknown");
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
try {
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
feedEntityParser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test
public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
Feed feed = parser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
parser.validate(feed);
}
@Test
public void testValidateACLOwner() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
CurrentUser.authenticate(USER);
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
Feed feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
feed.getACL().setOwner(USER);
feed.getACL().setGroup(getPrimaryGroupName());
feedEntityParser.validate(feed);
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLBadOwner() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
CurrentUser.authenticate("blah");
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
Feed feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
feedEntityParser.validate(feed);
Assert.fail("Validation exception should have been thrown for invalid owner");
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLBadOwnerAndGroup() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
CurrentUser.authenticate("blah");
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
Feed feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
feedEntityParser.validate(feed);
Assert.fail("Validation exception should have been thrown for invalid owner");
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLAndStorageBadOwner() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
Feed feed = null;
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
// create locations
createLocations(feed);
feedEntityParser.validate(feed);
} finally {
if (feed != null) {
deleteLocations(feed);
}
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLAndStorageBadOwnerAndGroup() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
Feed feed = null;
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
// create locations
createLocations(feed);
feedEntityParser.validate(feed);
} finally {
if (feed != null) {
deleteLocations(feed);
}
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLAndStorageForValidOwnerBadGroup() throws Exception {
CurrentUser.authenticate(USER);
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
Feed feed = null;
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
feed.getACL().setOwner(USER);
// create locations
createLocations(feed);
feedEntityParser.validate(feed);
} finally {
if (feed != null) {
deleteLocations(feed);
}
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLValidGroupBadOwner() throws Exception {
CurrentUser.authenticate(USER);
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
EntityType.FEED);
Feed feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
feed.getACL().setGroup(getPrimaryGroupName());
feedEntityParser.validate(feed);
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLAndStorageForInvalidOwnerAndGroup() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
Feed feed = null;
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
EntityType.FEED);
feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
// create locations
createLocations(feed);
feedEntityParser.validate(feed);
} finally {
if (feed != null) {
deleteLocations(feed);
}
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
@Test (expectedExceptions = ValidationException.class)
public void testValidateACLAndStorageForValidGroupBadOwner() throws Exception {
CurrentUser.authenticate(USER);
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
Feed feed = null;
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
EntityType.FEED);
feed = feedEntityParser.parse(stream);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
Assert.assertNotNull(feed.getACL().getOwner());
Assert.assertNotNull(feed.getACL().getGroup());
Assert.assertNotNull(feed.getACL().getPermission());
feed.getACL().setGroup(getPrimaryGroupName());
// create locations
createLocations(feed);
feedEntityParser.validate(feed);
} finally {
if (feed != null) {
deleteLocations(feed);
}
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
private void createLocations(Feed feed) throws IOException {
for (Location location : feed.getLocations().getLocations()) {
if (location.getType() == LocationType.DATA) {
dfsCluster.getFileSystem().create(new Path(location.getPath()));
break;
}
}
}
private void deleteLocations(Feed feed) throws IOException {
for (Location location : feed.getLocations().getLocations()) {
if (location.getType() == LocationType.DATA) {
dfsCluster.getFileSystem().delete(new Path(location.getPath()), true);
break;
}
}
}
@Test
public void testValidateACLForArchiveReplication() throws Exception {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
Assert.assertTrue(Boolean.valueOf(
StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
CurrentUser.authenticate(USER);
try {
InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
// need a new parser since it caches authorization enabled flag
FeedEntityParser feedEntityParser =
(FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
Feed feed = feedEntityParser.parse(stream);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
FeedHelper.getCluster(feed, "backupCluster");
Location location = new Location();
location.setType(LocationType.DATA);
location.setPath(
"s3://falcontesting@hwxasvtesting.blob.core.windows.net/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}");
Locations locations = new Locations();
locations.getLocations().add(location);
feedCluster.setLocations(locations);
Assert.assertNotNull(feed);
Assert.assertNotNull(feed.getACL());
feed.getACL().setOwner(USER);
feed.getACL().setGroup(getPrimaryGroupName());
try {
feedEntityParser.validate(feed);
} catch (IllegalArgumentException e) {
// this is normal since AWS Secret Access Key is not specified as the password of a s3 URL
}
} finally {
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
}
}
}