blob: d212a986b5113e6e6511dc5a2f6f514fb4f64270 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.expression.ExpressionHelper;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Feed entity helper methods.
*/
public final class FeedHelper {
private FeedHelper() {}
public static Cluster getCluster(Feed feed, String clusterName) {
for (Cluster cluster : feed.getClusters().getClusters()) {
if (cluster.getName().equals(clusterName)) {
return cluster;
}
}
return null;
}
public static Storage createStorage(Feed feed) throws FalconException {
final List<Location> locations = feed.getLocations().getLocations();
if (locations != null) {
return new FileSystemStorage(locations);
}
try {
final CatalogTable table = feed.getTable();
if (table != null) {
return new CatalogStorage(table.getUri());
}
} catch (URISyntaxException e) {
throw new FalconException(e);
}
throw new FalconException("Both catalog and locations are not defined.");
}
public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
Feed feed) throws FalconException {
return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
}
public static Storage createStorage(String clusterName, Feed feed)
throws FalconException {
return createStorage(getCluster(feed, clusterName), feed);
}
public static Storage createStorage(Cluster cluster, Feed feed)
throws FalconException {
final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
return createStorage(cluster, feed, clusterEntity);
}
public static Storage createStorage(Cluster cluster, Feed feed,
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
throws FalconException {
final List<Location> locations = getLocations(cluster, feed);
if (locations != null) {
return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
}
try {
final CatalogTable table = getTable(cluster, feed);
if (table != null) {
return new CatalogStorage(
ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
table.getUri());
}
} catch (URISyntaxException e) {
throw new FalconException(e);
}
throw new FalconException("Both catalog and locations are not defined.");
}
private static List<Location> getLocations(Cluster cluster, Feed feed) {
// check if locations are overridden in cluster
final Locations clusterLocations = cluster.getLocations();
if (clusterLocations != null
&& clusterLocations.getLocations().size() != 0) {
return clusterLocations.getLocations();
}
final Locations feedLocations = feed.getLocations();
return feedLocations == null ? null : feedLocations.getLocations();
}
private static CatalogTable getTable(Cluster cluster, Feed feed) {
// check if table is overridden in cluster
if (cluster.getTable() != null) {
return cluster.getTable();
}
return feed.getTable();
}
public static String normalizePartitionExpression(String part1, String part2) {
String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
partExp = partExp.replaceAll("//+", "/");
partExp = StringUtils.stripStart(partExp, "/");
partExp = StringUtils.stripEnd(partExp, "/");
return partExp;
}
public static String normalizePartitionExpression(String partition) {
return normalizePartitionExpression(partition, null);
}
private static Properties loadClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
Properties properties = new Properties();
Map<String, String> clusterVars = new HashMap<String, String>();
clusterVars.put("colo", cluster.getColo());
clusterVars.put("name", cluster.getName());
if (cluster.getProperties() != null) {
for (Property property : cluster.getProperties().getProperties()) {
clusterVars.put(property.getName(), property.getValue());
}
}
properties.put("cluster", clusterVars);
return properties;
}
public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp)
throws FalconException {
Properties properties = loadClusterProperties(clusterEntity);
ExpressionHelper expHelp = ExpressionHelper.get();
expHelp.setPropertiesForVariable(properties);
return expHelp.evaluateFullExpression(exp, String.class);
}
}