blob: 402381d702a21d61218cfc78dfa18329b91d606f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.validation;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.parser.ClusterEntityParser;
import org.apache.falcon.entity.parser.EntityParserFactory;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.TestContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
/**
* Tests cluster entity validation to verify if each of the specified
* interface endpoints are valid.
*/
public class ClusterEntityValidationIT {
private static final FsPermission OWNER_ONLY_PERMISSION =
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
private final TestContext context = new TestContext();
private Map<String, String> overlay;
private final ClusterEntityParser parser =
(ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
private Cluster cluster;
private FileSystem fs;
@BeforeClass
public void setup() throws Exception {
TestContext.prepare();
overlay = context.getUniqueOverlay();
String filePath = TestContext.overlayParametersOverTemplate(
TestContext.CLUSTER_TEMPLATE, overlay);
context.setCluster(filePath);
InputStream stream = new FileInputStream(filePath);
cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
Assert.assertNotNull(cluster);
fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
}
@AfterClass
public void tearDown() throws Exception {
TestContext.deleteEntitiesFromStore();
}
/**
* Positive test.
*
* @throws Exception
*/
@Test
public void testClusterEntityWithValidInterfaces() throws Exception {
overlay = context.getUniqueOverlay();
overlay.put("colo", "default");
ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
context.assertSuccessful(response);
}
@DataProvider(name = "interfaceToInvalidURLs")
public Object[][] createInterfaceToInvalidURLData() {
return new Object[][] {
// TODO FileSystem validates invalid hftp url, does NOT fail
// {Interfacetype.READONLY, "hftp://localhost:41119"},
{Interfacetype.READONLY, ""},
{Interfacetype.READONLY, "localhost:41119"},
{Interfacetype.WRITE, "write-interface:9999"},
{Interfacetype.WRITE, "hdfs://write-interface:9999"},
{Interfacetype.EXECUTE, "execute-interface:9999"},
{Interfacetype.WORKFLOW, "workflow-interface:9999/oozie/"},
{Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"},
{Interfacetype.MESSAGING, "messaging-interface:9999"},
{Interfacetype.MESSAGING, "tcp://messaging-interface:9999"},
{Interfacetype.REGISTRY, "catalog-interface:9999"},
{Interfacetype.REGISTRY, "http://catalog-interface:9999"},
};
}
@Test (dataProvider = "interfaceToInvalidURLs")
public void testClusterEntityWithInvalidInterfaces(Interfacetype interfacetype, String endpoint)
throws Exception {
overlay = context.getUniqueOverlay();
String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
InputStream stream = new FileInputStream(filePath);
Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
Assert.assertNotNull(clusterEntity);
clusterEntity.setColo("default"); // validations will be ignored if not default & tests fail
Interface anInterface = ClusterHelper.getInterface(clusterEntity, interfacetype);
anInterface.setEndpoint(endpoint);
File tmpFile = TestContext.getTempFile();
EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
System.out.println("Starting Interface type " + interfacetype + "Endpoint " + endpoint);
ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
context.assertFailure(response);
System.out.println("Completed Interface type " + interfacetype + "Endpoint " + endpoint);
}
@Test
public void testValidateACL() throws Exception {
overlay = context.getUniqueOverlay();
String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
InputStream stream = new FileInputStream(filePath);
Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
Assert.assertNotNull(clusterEntity);
// Adding ACL with authorization disabled must not hurt
ACL clusterACL = new ACL();
clusterACL.setOwner(TestContext.REMOTE_USER);
clusterACL.setGroup(TestContext.REMOTE_USER);
clusterEntity.setACL(clusterACL);
clusterEntity.setColo("default"); // validations will be ignored if not default & tests fail
File tmpFile = TestContext.getTempFile();
EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
context.assertSuccessful(response);
}
@Test
public void testValidateClusterLocations() throws Exception {
TestContext.createClusterLocations(cluster, fs);
parser.validate(cluster);
}
@Test
public void testValidateClusterLocationsWithoutWorking() throws Exception {
overlay = context.getUniqueOverlay();
String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
InputStream stream = new FileInputStream(filePath);
Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
clusterEntity.getLocations().getLocations().remove(2);
FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(cluster));
TestContext.createClusterLocations(clusterEntity, clusterFileSystem, false);
parser.validate(clusterEntity);
String expectedPath =
ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath() + "/working";
Assert.assertEquals(ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(),
expectedPath);
Assert.assertTrue(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).isDirectory());
Assert.assertEquals(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).getPermission(),
HadoopClientFactory.READ_EXECUTE_PERMISSION);
}
@Test(expectedExceptions = ValidationException.class)
public void testValidateClusterLocationsThatDontExist() throws Exception {
TestContext.deleteClusterLocations(cluster, fs);
parser.validate(cluster);
Assert.fail("Should have thrown a validation exception");
}
@Test(expectedExceptions = ValidationException.class)
public void testValidateClusterLocationsThatExistWithBadOwner() throws Exception {
createClusterLocationsBadPermissions(cluster);
parser.validate(cluster);
Assert.fail("Should have thrown a validation exception");
}
private void createClusterLocationsBadPermissions(Cluster clusterEntity) throws IOException {
FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity));
TestContext.deleteClusterLocations(clusterEntity, clusterFileSystem);
String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION);
String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath();
Path workingPath = new Path(workingLocation);
FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION);
}
}