blob: 3f359629001a944662ef374cee1ecda600ed9596 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.parser;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Properties;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.util.OozieConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
/**
* Parser that parses cluster entity definition.
*/
public class ClusterEntityParser extends EntityParser<Cluster> {
private static final Logger LOG = LoggerFactory.getLogger(ClusterEntityParser.class);
public ClusterEntityParser() {
super(EntityType.CLUSTER);
}
@Override
public void validate(Cluster cluster) throws ValidationException {
// validating scheme in light of fail-early
validateScheme(cluster, Interfacetype.READONLY);
validateScheme(cluster, Interfacetype.WRITE);
validateScheme(cluster, Interfacetype.WORKFLOW);
// User may choose to disable job completion notifications
if (ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING) != null) {
validateScheme(cluster, Interfacetype.MESSAGING);
}
if (CatalogServiceFactory.isEnabled()
&& ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) {
validateScheme(cluster, Interfacetype.REGISTRY);
}
validateACL(cluster);
if (!EntityUtil.responsibleFor(cluster.getColo())) {
return;
}
validateReadInterface(cluster);
validateWriteInterface(cluster);
validateExecuteInterface(cluster);
validateWorkflowInterface(cluster);
validateMessagingInterface(cluster);
validateRegistryInterface(cluster);
validateLocations(cluster);
validateProperties(cluster);
validateSparkMasterInterface(cluster);
}
private void validateScheme(Cluster cluster, Interfacetype interfacetype)
throws ValidationException {
final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint();
URI uri = new Path(endpoint).toUri();
if (uri.getScheme() == null) {
if (Interfacetype.WORKFLOW == interfacetype
&& uri.toString().equals(OozieConstants.LOCAL_OOZIE)) {
return;
}
throw new ValidationException("Cannot get valid scheme for interface: "
+ interfacetype + " of cluster: " + cluster.getName());
}
}
private void validateReadInterface(Cluster cluster) throws ValidationException {
final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
LOG.info("Validating read interface: {}", readOnlyStorageUrl);
validateFileSystem(cluster, readOnlyStorageUrl);
}
private void validateWriteInterface(Cluster cluster) throws ValidationException {
final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
LOG.info("Validating write interface: {}", writeStorageUrl);
validateFileSystem(cluster, writeStorageUrl);
}
private void validateFileSystem(Cluster cluster, String storageUrl) throws ValidationException {
try {
Configuration conf = new Configuration();
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
conf.setInt("ipc.client.connect.max.retries", 10);
if (UserGroupInformation.isSecurityEnabled()) {
String nameNodePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.NN_PRINCIPAL);
Validate.notEmpty(nameNodePrincipal,
"Cluster definition missing required namenode credential property: " + SecurityUtil.NN_PRINCIPAL);
conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
}
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
fs.exists(new Path("/"));
} catch (Exception e) {
throw new ValidationException("Invalid storage server or port: " + storageUrl
+ ", " + e.getMessage(), e);
}
}
private void validateExecuteInterface(Cluster cluster) throws ValidationException {
String executeUrl = ClusterHelper.getMREndPoint(cluster);
LOG.info("Validating execute interface: {}", executeUrl);
try {
String rmPrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.RM_PRINCIPAL);
HadoopClientFactory.get().validateJobClient(executeUrl, rmPrincipal);
} catch (IOException e) {
throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
}
}
protected void validateWorkflowInterface(Cluster cluster) throws ValidationException {
final String workflowUrl = ClusterHelper.getOozieUrl(cluster);
LOG.info("Validating workflow interface: {}", workflowUrl);
if (OozieConstants.LOCAL_OOZIE.equals(workflowUrl)) {
return;
}
try {
if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) {
throw new ValidationException("Unable to reach Workflow server:" + workflowUrl);
}
} catch (FalconException e) {
throw new ValidationException("Invalid Workflow server or port: " + workflowUrl, e);
}
}
protected void validateMessagingInterface(Cluster cluster) throws ValidationException {
// Validate only if user has specified this
final Interface messagingInterface = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING);
if (messagingInterface == null) {
LOG.info("Messaging service is not enabled for cluster: {}", cluster.getName());
return;
}
final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
final String implementation = StartupProperties.get().getProperty("broker.impl.class",
"org.apache.activemq.ActiveMQConnectionFactory");
LOG.info("Validating messaging interface: {}, implementation: {}", messagingUrl, implementation);
try {
@SuppressWarnings("unchecked")
Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
getClass().getClassLoader().loadClass(implementation);
ConnectionFactory connectionFactory = clazz.getConstructor(
String.class, String.class, String.class).newInstance("", "", messagingUrl);
connectionFactory.createConnection();
} catch (Exception e) {
throw new ValidationException("Invalid Messaging server or port: " + messagingUrl
+ " for: " + implementation, e);
}
}
protected void validateRegistryInterface(Cluster cluster) throws ValidationException {
final boolean isCatalogRegistryEnabled = CatalogServiceFactory.isEnabled();
if (!isCatalogRegistryEnabled) {
return; // ignore the registry interface for backwards compatibility
}
// continue validation only if a catalog service is provided
final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
if (catalogInterface == null) {
LOG.info("Catalog service is not enabled for cluster: {}", cluster.getName());
return;
}
final String catalogUrl = catalogInterface.getEndpoint();
LOG.info("Validating catalog registry interface: {}", catalogUrl);
try {
Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
if (UserGroupInformation.isSecurityEnabled()) {
String metaStorePrincipal = clusterConf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
Validate.notEmpty(metaStorePrincipal,
"Cluster definition missing required metastore credential property: "
+ SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
}
if (!CatalogServiceFactory.getCatalogService().isAlive(clusterConf, catalogUrl)) {
throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
}
} catch (FalconException e) {
throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e);
}
}
protected void validateSparkMasterInterface(Cluster cluster) throws ValidationException {
final String sparkMasterEndPoint = ClusterHelper.getSparkMasterEndPoint(cluster);
LOG.info("Validating spark interface: {}", sparkMasterEndPoint);
if (StringUtils.isNotEmpty(sparkMasterEndPoint)) {
if (!("yarn-cluster".equalsIgnoreCase(sparkMasterEndPoint)
|| "yarn-client".equalsIgnoreCase(sparkMasterEndPoint)
|| "local".equalsIgnoreCase(sparkMasterEndPoint))) {
throw new ValidationException("Invalid Spark Interface End Point:" + sparkMasterEndPoint);
}
}
}
/**
* Validate ACL if authorization is enabled.
*
* @param cluster cluster entity
* @throws ValidationException
*/
private void validateACL(Cluster cluster) throws ValidationException {
if (isAuthorizationDisabled) {
return;
}
// Validate the entity owner is logged-in, authenticated user if authorization is enabled
final ACL clusterACL = cluster.getACL();
if (clusterACL == null) {
throw new ValidationException("Cluster ACL cannot be empty for: " + cluster.getName());
}
validateACLOwnerAndGroup(clusterACL);
try {
authorize(cluster.getName(), clusterACL);
} catch (AuthorizationException e) {
throw new ValidationException(e);
}
}
/**
* Validate the locations on the cluster exists with appropriate permissions
* for the user to write to this directory.
*
* @param cluster cluster entity
* @throws ValidationException
*/
protected void validateLocations(Cluster cluster) throws ValidationException {
Configuration conf = ClusterHelper.getConfiguration(cluster);
FileSystem fs;
try {
fs = HadoopClientFactory.get().createFalconFileSystem(conf);
} catch (FalconException e) {
throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e);
}
Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING);
if (stagingLocation == null) {
throw new ValidationException(
"Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value()
+ " for cluster " + cluster.getName());
} else {
checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs,
HadoopClientFactory.ALL_PERMISSION);
if (!ClusterHelper.checkWorkingLocationExists(cluster)) {
//Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910
createWorkingDirUnderStaging(fs, cluster, stagingLocation);
} else {
Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING);
if (stagingLocation.getPath().equals(workingLocation.getPath())) {
throw new ValidationException(
"Location with name: " + stagingLocation.getName().value() + " and " + workingLocation
.getName().value() + " cannot have same path: " + stagingLocation.getPath()
+ " for cluster :" + cluster.getName());
} else {
checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs,
HadoopClientFactory.READ_EXECUTE_PERMISSION);
}
}
// Create staging subdirs falcon/workflows/feed and falcon/workflows/process : Falcon-1647
createStagingSubdirs(fs, cluster, stagingLocation,
"falcon/workflows/feed", HadoopClientFactory.ALL_PERMISSION);
createStagingSubdirs(fs, cluster, stagingLocation,
"falcon/workflows/process", HadoopClientFactory.ALL_PERMISSION);
// Create empty dirs for optional input
createStagingSubdirs(fs, cluster, stagingLocation,
ClusterHelper.EMPTY_DIR_NAME, HadoopClientFactory.READ_ONLY_PERMISSION);
}
}
private void createWorkingDirUnderStaging(FileSystem fs, Cluster cluster,
Location stagingLocation) throws ValidationException {
Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR);
try {
if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created
HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
} else {
if (fs.isDirectory(workingDirPath)) {
FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission();
if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check
throw new ValidationException(
"Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
+ stagingLocation.getPath()
+ " when staging location not specified with "
+ HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got "
+ workingPerms.toString());
}
} else {
throw new ValidationException(
"Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
+ stagingLocation.getPath()
+ " when staging location not specified. Got a file at " + workingDirPath
.toString());
}
}
} catch (IOException e) {
throw new ValidationException(
"Unable to create path for " + workingDirPath.toString() + " with path: "
+ workingDirPath.toString() + " for cluster " + cluster.getName(), e);
}
}
private void createStagingSubdirs(FileSystem fs, Cluster cluster, Location stagingLocation,
String path, FsPermission permission) throws ValidationException {
Path subdirPath = new Path(stagingLocation.getPath(), path);
try {
HadoopClientFactory.mkdirs(fs, subdirPath, permission);
} catch (IOException e) {
throw new ValidationException(
"Unable to create path "
+ subdirPath.toString() + " for cluster " + cluster.getName(), e);
}
}
protected void validateProperties(Cluster cluster) throws ValidationException {
Properties properties = cluster.getProperties();
if (properties == null) {
return; // Cluster has no properties to validate.
}
List<Property> propertyList = cluster.getProperties().getProperties();
HashSet<String> propertyKeys = new HashSet<String>();
for (Property prop : propertyList) {
if (StringUtils.isBlank(prop.getName())) {
throw new ValidationException("Property name and value cannot be empty for Cluster: "
+ cluster.getName());
}
if (!propertyKeys.add(prop.getName())) {
throw new ValidationException("Multiple properties with same name found for Cluster: "
+ cluster.getName());
}
}
}
private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs,
FsPermission expectedPermission) throws ValidationException {
Path locationPath = new Path(location);
try {
if (!fs.exists(locationPath)) {
throw new ValidationException("Location " + location + " for cluster " + clusterName + " must exist.");
}
// falcon owns this path on each cluster
final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
FileStatus fileStatus = fs.getFileStatus(locationPath);
final String locationOwner = fileStatus.getOwner();
if (!locationOwner.equals(loginUser)) {
LOG.error("Owner of the location {} is {} for cluster {}. Current user {} is not the owner of the "
+ "location.", locationPath, locationOwner, clusterName, loginUser);
throw new ValidationException("Path [" + locationPath + "] on the cluster [" + clusterName + "] has "
+ "owner [" + locationOwner + "]. Current user [" + loginUser + "] is not the owner of the "
+ "path");
}
String errorMessage = "Path " + locationPath + " has permissions: " + fileStatus.getPermission().toString()
+ ", should be " + expectedPermission;
if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
LOG.error(errorMessage);
throw new ValidationException(errorMessage);
}
// try to list to see if the user is able to write to this folder
fs.listStatus(locationPath);
} catch (IOException e) {
throw new ValidationException(
"Unable to validate the location with path: " + location + " for cluster:" + clusterName
+ " due to transient failures ", e);
}
}
}