blob: 4633796e56484792e749e8e02b7df3d5316579fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.catalog.AbstractCatalogService;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.catalog.HiveCatalogService;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.retention.EvictionHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.jsp.el.ELException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
/**
* A catalog registry implementation of a feed storage.
*/
public class CatalogStorage extends Configured implements Storage {
private static final Logger LOG = LoggerFactory.getLogger(CatalogStorage.class);
// constants to be used while preparing HCatalog partition filter query
private static final String FILTER_ST_BRACKET = "(";
private static final String FILTER_END_BRACKET = ")";
private static final String FILTER_QUOTE = "'";
private static final String FILTER_AND = " and ";
private static final String FILTER_OR = " or ";
private static final String FILTER_LESS_THAN = " < ";
private static final String FILTER_EQUALS = " = ";
private final StringBuffer instancePaths = new StringBuffer();
private final StringBuilder instanceDates = new StringBuilder();
public static final String PARTITION_SEPARATOR = ";";
public static final String PARTITION_KEYVAL_SEPARATOR = "=";
public static final String INPUT_PATH_SEPARATOR = ":";
public static final String OUTPUT_PATH_SEPARATOR = "/";
public static final String PARTITION_VALUE_QUOTE = "'";
public static final String CATALOG_URL = "${hcatNode}";
private final String catalogUrl;
private String database;
private String table;
private Map<String, String> partitions;
protected CatalogStorage(Feed feed) throws URISyntaxException {
this(CATALOG_URL, feed.getTable());
}
public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
}
protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
this(catalogUrl, table.getUri());
}
protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
if (catalogUrl == null || catalogUrl.length() == 0) {
throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
}
verifyAndUpdateConfiguration(getConf());
this.catalogUrl = catalogUrl;
parseFeedUri(tableUri);
}
/**
* Validate URI to conform to catalog:$database:$table#$partitions.
* scheme=catalog:database=$database:table=$table#$partitions
* partitions=key=value;key=value
*
* @param catalogTableUri table URI to parse and validate
* @throws URISyntaxException
*/
private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
final String processed = catalogTableUri.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
.replaceAll("}", EXPR_CLOSE_NORMALIZED);
URI tableUri = new URI(processed);
if (!"catalog".equals(tableUri.getScheme())) {
throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
}
final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
if (schemeSpecificPart == null) {
throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
}
String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
if (paths.length != 2) {
throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
}
database = paths[0];
table = paths[1];
if (database == null || database.length() == 0) {
throw new URISyntaxException(tableUri.toString(), "DB name is missing");
}
if (table == null || table.length() == 0) {
throw new URISyntaxException(tableUri.toString(), "Table name is missing");
}
String partRaw = tableUri.getFragment();
if (partRaw == null || partRaw.length() == 0) {
throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
}
final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
.replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
partitions = new LinkedHashMap<String, String>(); // preserve insertion order
String[] parts = rawPartition.split(PARTITION_SEPARATOR);
for (String part : parts) {
if (part == null || part.length() == 0) {
continue;
}
String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
if (keyVal.length != 2) {
throw new URISyntaxException(tableUri.toString(),
"Partition key value pair is not specified properly in (" + part + ")");
}
partitions.put(keyVal[0], keyVal[1]);
}
}
/**
* Create an instance from the URI Template that was generated using
* the getUriTemplate() method.
*
* @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
* @throws URISyntaxException
*/
protected CatalogStorage(String uriTemplate) throws URISyntaxException {
this(uriTemplate, new Configuration());
}
protected CatalogStorage(String uriTemplate, Configuration conf) throws URISyntaxException {
if (uriTemplate == null || uriTemplate.length() == 0) {
throw new IllegalArgumentException("URI template cannot be null or empty");
}
final String processed = uriTemplate.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
.replaceAll("}", EXPR_CLOSE_NORMALIZED);
URI uri = new URI(processed);
this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
verifyAndUpdateConfiguration(conf);
parseUriTemplate(uri);
}
private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
String path = uriTemplate.getPath();
String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
if (paths.length != 4) {
throw new URISyntaxException(uriTemplate.toString(),
"URI path is not in expected format: database:table");
}
database = paths[1];
table = paths[2];
String partRaw = paths[3];
if (database == null || database.length() == 0) {
throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
}
if (table == null || table.length() == 0) {
throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
}
if (partRaw == null || partRaw.length() == 0) {
throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
}
String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
.replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
partitions = new LinkedHashMap<String, String>();
String[] parts = rawPartition.split(PARTITION_SEPARATOR);
for (String part : parts) {
if (part == null || part.length() == 0) {
continue;
}
String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
if (keyVal.length != 2) {
throw new URISyntaxException(uriTemplate.toString(),
"Partition key value pair is not specified properly in (" + part + ")");
}
partitions.put(keyVal[0], keyVal[1]);
}
}
public String getCatalogUrl() {
return catalogUrl;
}
public String getDatabase() {
return database;
}
public String getTable() {
return table;
}
public Map<String, String> getPartitions() {
return partitions;
}
/**
* @param key partition key
* @return partition value
*/
public String getPartitionValue(String key) {
return partitions.get(key);
}
/**
* @param key partition key
* @return if partitions map includes the key or not
*/
public boolean hasPartition(String key) {
return partitions.containsKey(key);
}
public List<String> getDatedPartitionKeys() {
List<String> keys = new ArrayList<String>();
for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue());
if (matcher.find()) {
keys.add(entry.getKey());
}
}
return keys;
}
/**
* Convert the partition map to filter string.
* Each key value pair is separated by ';'.
*
* @return filter string
*/
public String toPartitionFilter() {
StringBuilder filter = new StringBuilder();
filter.append("(");
for (Map.Entry<String, String> entry : partitions.entrySet()) {
if (filter.length() > 1) {
filter.append(PARTITION_SEPARATOR);
}
filter.append(entry.getKey());
filter.append(PARTITION_KEYVAL_SEPARATOR);
filter.append(PARTITION_VALUE_QUOTE);
filter.append(entry.getValue());
filter.append(PARTITION_VALUE_QUOTE);
}
filter.append(")");
return filter.toString();
}
/**
* Convert the partition map to path string.
* Each key value pair is separated by '/'.
*
* @return path string
*/
public String toPartitionAsPath() {
StringBuilder partitionFilter = new StringBuilder();
for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
partitionFilter.append(entry.getKey())
.append(PARTITION_KEYVAL_SEPARATOR)
.append(entry.getValue())
.append(OUTPUT_PATH_SEPARATOR);
}
partitionFilter.setLength(partitionFilter.length() - 1);
return partitionFilter.toString();
}
@Override
public TYPE getType() {
return TYPE.TABLE;
}
/**
* LocationType does NOT matter here.
*/
@Override
public String getUriTemplate() {
return getUriTemplate(LocationType.DATA);
}
/**
* LocationType does NOT matter here.
*/
@Override
public String getUriTemplate(LocationType locationType) {
StringBuilder uriTemplate = new StringBuilder();
uriTemplate.append(catalogUrl);
uriTemplate.append(OUTPUT_PATH_SEPARATOR);
uriTemplate.append(database);
uriTemplate.append(OUTPUT_PATH_SEPARATOR);
uriTemplate.append(table);
uriTemplate.append(OUTPUT_PATH_SEPARATOR);
for (Map.Entry<String, String> entry : partitions.entrySet()) {
uriTemplate.append(entry.getKey());
uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
uriTemplate.append(entry.getValue());
uriTemplate.append(PARTITION_SEPARATOR);
}
uriTemplate.setLength(uriTemplate.length() - 1);
return uriTemplate.toString();
}
@Override
public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
if (!(toCompareAgainst instanceof CatalogStorage)) {
return false;
}
CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
&& getDatabase().equals(catalogStorage.getDatabase())
&& getTable().equals(catalogStorage.getTable())
&& getPartitions().equals(catalogStorage.getPartitions());
}
@Override
public void validateACL(AccessControlList acl) throws FalconException {
// This is not supported in Hive today as authorization is not enforced on table and
// partition listing
}
@Override
public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
Date start, Date end) throws FalconException {
try {
List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName);
Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(),
feed.getTimezone(), start);
while (!end.before(alignedDate)) {
List<String> partitionValues = getCatalogPartitionValues(alignedDate);
try {
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues);
instances.add(getFeedInstanceFromCatalogPartition(partition));
} catch (FalconException e) {
if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) {
// Partition missing
FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null);
instanceStatus.setInstance(partitionValues.toString());
instances.add(instanceStatus);
} else {
throw e;
}
}
alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed);
}
return instances;
} catch (Exception e) {
LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage());
throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e);
}
}
private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException {
List<String> partitionValues = new ArrayList<String>();
for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
ExpressionHelper.setReferenceDate(alignedDate);
ExpressionHelper expressionHelper = ExpressionHelper.get();
String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class);
partitionValues.add(instanceValue);
} else {
partitionValues.add(entry.getValue());
}
}
return partitionValues;
}
private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) {
FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation());
feedInstanceStatus.setCreationTime(partition.getCreateTime());
feedInstanceStatus.setInstance(partition.getValues().toString());
FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING;
long size = partition.getSize();
if (size == 0) {
availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY;
} else if (size > 0) {
availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE;
}
feedInstanceStatus.setSize(size);
feedInstanceStatus.setStatus(availabilityStatus);
return feedInstanceStatus;
}
@Override
public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
LocationType locationType, Date instanceTime) throws FalconException {
List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
if (result.isEmpty()) {
return FeedInstanceStatus.AvailabilityStatus.MISSING;
} else {
return result.get(0).getStatus();
}
}
@Override
public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
getTable(), retentionLimit, timeZone);
List<CatalogPartition> toBeDeleted;
try {
// get sorted date partition keys and values
toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone);
} catch (ELException e) {
throw new FalconException("Couldn't find partitions to be deleted", e);
}
if (toBeDeleted.isEmpty()) {
LOG.info("No partitions to delete.");
} else {
final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
getConf(), getCatalogUrl(), getDatabase(), getTable());
try {
dropPartitions(toBeDeleted, isTableExternal);
} catch (IOException e) {
throw new FalconException("Couldn't drop partitions", e);
}
}
try {
EvictedInstanceSerDe.serializeEvictedInstancePaths(
HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), new Configuration()),
logFilePath, instancePaths);
} catch (IOException e) {
throw new FalconException("Couldn't record dropped partitions", e);
}
return instanceDates;
}
private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone)
throws FalconException, ELException {
Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
ExpressionHelper.setReferenceDate(range.first);
Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>();
ExpressionHelper expressionHelper = ExpressionHelper.get();
for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
partitionsToDelete.put(entry.getKey(),
expressionHelper.evaluateFullExpression(entry.getValue(), String.class));
}
}
final String filter = createFilter(partitionsToDelete);
return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
}
/**
* Creates hive partition filter from inputs partition map.
* @param partitionsMap - ordered map of partition keys and values
* @return partition filter
* @throws ELException
*/
private String createFilter(Map<String, String> partitionsMap) throws ELException {
/* Construct filter query string. As an example, suppose the dated partition keys
* are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
* Then the filter query generated is of the format:
* "(year < '2014') or (year = '2014' and month < '02') or
* (year = '2014' and month = '02' and day < '24') or
* or (year = '2014' and month = '02' and day = '24' and hour < '10')"
*/
StringBuilder filterBuffer = new StringBuilder();
List<String> keys = new ArrayList<String>(partitionsMap.keySet());
for (int curr = 0; curr < partitionsMap.size(); curr++) {
if (curr > 0) {
filterBuffer.append(FILTER_OR);
}
filterBuffer.append(FILTER_ST_BRACKET);
for (int prev = 0; prev < curr; prev++) {
String key = keys.get(prev);
filterBuffer.append(key)
.append(FILTER_EQUALS)
.append(FILTER_QUOTE)
.append(partitionsMap.get(key))
.append(FILTER_QUOTE)
.append(FILTER_AND);
}
String key = keys.get(curr);
filterBuffer.append(key)
.append(FILTER_LESS_THAN)
.append(FILTER_QUOTE)
.append(partitionsMap.get(key))
.append(FILTER_QUOTE)
.append(FILTER_END_BRACKET);
}
return filterBuffer.toString();
}
private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal)
throws FalconException, IOException {
AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
for (CatalogPartition partition : partitionsToDelete) {
boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(),
partition.getValues(), true);
if (!deleted) {
return;
}
if (isTableExternal) { // nuke the dirs if an external table
final Path path = new Path(partition.getLocation());
if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) {
throw new FalconException("Failed to delete location " + path + " for partition "
+ partition.getValues());
}
}
// replace ',' with ';' since message producer splits instancePaths string by ','
String partitionInfo = partition.getValues().toString().replace(",", ";");
LOG.info("Deleted partition: " + partitionInfo);
instanceDates.append(partitionInfo).append(',');
instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
@Override
public String toString() {
return "CatalogStorage{"
+ "catalogUrl='" + catalogUrl + '\''
+ ", database='" + database + '\''
+ ", table='" + table + '\''
+ ", partitions=" + partitions
+ '}';
}
private void verifyAndUpdateConfiguration(Configuration conf) {
if (conf == null) {
setConf(new Configuration());
} else {
setConf(conf);
}
}
}