blob: eb1558572209a7018e80c410bc5216192937b070 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.retention.EvictionHelper;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.jsp.el.ELException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A file system implementation of a feed storage.
*/
public class FileSystemStorage extends Configured implements Storage {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemStorage.class);
private final StringBuffer instancePaths = new StringBuffer();
private final StringBuilder instanceDates = new StringBuilder();
public static final String FEED_PATH_SEP = "#";
public static final String LOCATION_TYPE_SEP = "=";
public static final String FILE_SYSTEM_URL = "${nameNode}";
private final String storageUrl;
private final List<Location> locations;
public FileSystemStorage(Feed feed) {
this(FILE_SYSTEM_URL, feed.getLocations());
}
protected FileSystemStorage(String storageUrl, Locations locations) {
this(storageUrl, locations.getLocations());
}
protected FileSystemStorage(String storageUrl, List<Location> locations) {
if (storageUrl == null || storageUrl.length() == 0) {
throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
}
if (locations == null || locations.size() == 0) {
throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
}
this.storageUrl = storageUrl;
this.locations = locations;
}
/**
* Create an instance from the URI Template that was generated using
* the getUriTemplate() method.
*
* @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate
* @throws URISyntaxException
*/
protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
if (uriTemplate == null || uriTemplate.length() == 0) {
throw new IllegalArgumentException("URI template cannot be null or empty");
}
String rawStorageUrl = null;
List<Location> rawLocations = new ArrayList<Location>();
String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
for (String rawPath : feedLocs) {
String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
final String processed = typeAndPath[1].replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
.replaceAll("}", EXPR_CLOSE_NORMALIZED);
URI uri = new URI(processed);
if (rawStorageUrl == null) {
rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
}
String path = uri.getPath();
final String finalPath = path.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
.replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
Location location = new Location();
location.setPath(finalPath);
location.setType(LocationType.valueOf(typeAndPath[0]));
rawLocations.add(location);
}
this.storageUrl = rawStorageUrl;
this.locations = rawLocations;
}
@Override
public TYPE getType() {
return TYPE.FILESYSTEM;
}
public String getStorageUrl() {
return storageUrl;
}
public List<Location> getLocations() {
return locations;
}
@Override
public String getUriTemplate() {
String feedPathMask = getUriTemplate(LocationType.DATA);
String metaPathMask = getUriTemplate(LocationType.META);
String statsPathMask = getUriTemplate(LocationType.STATS);
String tmpPathMask = getUriTemplate(LocationType.TMP);
StringBuilder feedBasePaths = new StringBuilder();
feedBasePaths.append(LocationType.DATA.name())
.append(LOCATION_TYPE_SEP)
.append(feedPathMask);
if (metaPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.META.name())
.append(LOCATION_TYPE_SEP)
.append(metaPathMask);
}
if (statsPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.STATS.name())
.append(LOCATION_TYPE_SEP)
.append(statsPathMask);
}
if (tmpPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.TMP.name())
.append(LOCATION_TYPE_SEP)
.append(tmpPathMask);
}
return feedBasePaths.toString();
}
@Override
public String getUriTemplate(LocationType locationType) {
return getUriTemplate(locationType, locations);
}
public String getUriTemplate(LocationType locationType, List<Location> locationList) {
Location locationForType = null;
for (Location location : locationList) {
if (location.getType() == locationType) {
locationForType = location;
break;
}
}
if (locationForType == null || StringUtils.isEmpty(locationForType.getPath())) {
return null;
}
// normalize the path so trailing and double '/' are removed
Path locationPath = new Path(locationForType.getPath());
locationPath = locationPath.makeQualified(getDefaultUri(), getWorkingDir());
if (isRelativePath(locationPath)) {
locationPath = new Path(storageUrl + locationPath);
}
return locationPath.toString();
}
private boolean isRelativePath(Path locationPath) {
return locationPath.toUri().getAuthority() == null && isStorageUrlATemplate();
}
private boolean isStorageUrlATemplate() {
return storageUrl.startsWith(FILE_SYSTEM_URL);
}
private URI getDefaultUri() {
return new Path(isStorageUrlATemplate() ? "/" : storageUrl).toUri();
}
public Path getWorkingDir() {
return new Path(CurrentUser.isAuthenticated() ? "/user/" + CurrentUser.getUser() : "/");
}
@Override
public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
if (!(toCompareAgainst instanceof FileSystemStorage)) {
return false;
}
FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
final List<Location> fsStorageLocations = fsStorage.getLocations();
return getLocations().size() == fsStorageLocations.size()
&& StringUtils.equals(getUriTemplate(LocationType.DATA, getLocations()),
getUriTemplate(LocationType.DATA, fsStorageLocations))
&& StringUtils.equals(getUriTemplate(LocationType.STATS, getLocations()),
getUriTemplate(LocationType.STATS, fsStorageLocations))
&& StringUtils.equals(getUriTemplate(LocationType.META, getLocations()),
getUriTemplate(LocationType.META, fsStorageLocations))
&& StringUtils.equals(getUriTemplate(LocationType.TMP, getLocations()),
getUriTemplate(LocationType.TMP, fsStorageLocations));
}
public static Location getLocation(List<Location> locations, LocationType type) {
for (Location loc : locations) {
if (loc.getType() == type) {
return loc;
}
}
return null;
}
@Override
public void validateACL(AccessControlList acl) throws FalconException {
try {
for (Location location : getLocations()) {
String pathString = getRelativePath(location);
Path path = new Path(pathString);
FileSystem fileSystem =
HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf());
if (fileSystem.exists(path)) {
FileStatus fileStatus = fileSystem.getFileStatus(path);
Set<String> groups = CurrentUser.getGroupNames();
if (fileStatus.getOwner().equals(acl.getOwner())
|| groups.contains(acl.getGroup())) {
return;
}
LOG.error("Permission denied: Either Feed ACL owner {} or group {} doesn't "
+ "match the actual file owner {} or group {} for file {}",
acl, acl.getGroup(), fileStatus.getOwner(), fileStatus.getGroup(), path);
throw new FalconException("Permission denied: Either Feed ACL owner "
+ acl + " or group " + acl.getGroup() + " doesn't match the actual "
+ "file owner " + fileStatus.getOwner() + " or group "
+ fileStatus.getGroup() + " for file " + path);
}
}
} catch (IOException e) {
LOG.error("Can't validate ACL on storage {}", getStorageUrl(), e);
throw new RuntimeException("Can't validate storage ACL (URI " + getStorageUrl() + ")", e);
}
}
@Override
public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
TimeZone tz = TimeZone.getTimeZone(timeZone);
try{
for (Location location : getLocations()) {
fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath);
}
EvictedInstanceSerDe.serializeEvictedInstancePaths(
HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()),
logFilePath, instancePaths);
}catch (IOException e){
throw new FalconException("Couldn't evict feed from fileSystem", e);
}catch (ELException e){
throw new FalconException("Couldn't evict feed from fileSystem", e);
}
return instanceDates;
}
private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone,
Path logFilePath) throws IOException, ELException, FalconException {
Path normalizedPath = new Path(feedPath);
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
feedPath = normalizedPath.toUri().getPath();
LOG.info("Normalized path: {}", feedPath);
Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs);
if (toBeDeleted.isEmpty()) {
LOG.info("No instances to delete.");
return;
}
DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT);
dateFormat.setTimeZone(timeZone);
Path feedBasePath = fs.makeQualified(FeedHelper.getFeedBasePath(feedPath));
for (Path path : toBeDeleted) {
deleteInstance(fs, path, feedBasePath);
Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone);
instanceDates.append(dateFormat.format(date)).append(',');
instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs)
throws IOException {
FileStatus[] files = findFilesForFeed(fs, inPath);
if (files == null || files.length == 0) {
return Collections.emptyList();
}
List<Path> toBeDeleted = new ArrayList<Path>();
for (FileStatus file : files) {
Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone);
LOG.debug("Considering {}", file.getPath().toUri().getPath());
LOG.debug("Date: {}", date);
if (date != null && !isDateInRange(date, start)) {
toBeDeleted.add(file.getPath());
}
}
return toBeDeleted;
}
private FileStatus[] findFilesForFeed(FileSystem fs, String feedBasePath) throws IOException {
Matcher matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
boolean regexMatchFound = false;
while (matcher.find()) {
regexMatchFound = true;
String var = feedBasePath.substring(matcher.start(), matcher.end());
feedBasePath = feedBasePath.replaceAll(Pattern.quote(var), "*");
matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
}
if (regexMatchFound) {
LOG.info("Searching for {}", feedBasePath);
return fs.globStatus(new Path(feedBasePath));
} else {
LOG.info("Ignoring static path {}", feedBasePath);
return null;
}
}
private boolean isDateInRange(Date date, Date start) {
//ignore end ( && date.compareTo(end) <= 0 )
return date.compareTo(start) >= 0;
}
private void deleteInstance(FileSystem fs, Path path, Path feedBasePath) throws IOException {
if (fs.delete(path, true)) {
LOG.info("Deleted instance: {}", path);
}else{
throw new IOException("Unable to delete instance: " + path);
}
deleteParentIfEmpty(fs, path.getParent(), feedBasePath);
}
private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException {
if (feedBasePath.equals(parent)) {
LOG.info("Not deleting feed base path: {}", parent);
} else {
FileStatus[] files = fs.listStatus(parent);
if (files != null && files.length == 0) {
LOG.info("Parent path: {} is empty, deleting path", parent);
if (fs.delete(parent, true)) {
LOG.info("Deleted empty dir: {}", parent);
} else {
throw new IOException("Unable to delete parent path:" + parent);
}
deleteParentIfEmpty(fs, parent.getParent(), feedBasePath);
}
}
}
@Override
@SuppressWarnings("MagicConstant")
public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
Date start, Date end) throws FalconException {
Calendar calendar = Calendar.getInstance();
List<Location> clusterSpecificLocation = FeedHelper.
getLocations(FeedHelper.getCluster(feed, clusterName), feed);
Location location = getLocation(clusterSpecificLocation, locationType);
try {
FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
Cluster cluster = ClusterHelper.getCluster(clusterName);
Properties baseProperties = FeedHelper.getClusterProperties(cluster);
baseProperties.putAll(FeedHelper.getFeedProperties(feed));
List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
Date feedStart = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
TimeZone tz = feed.getTimezone();
Date alignedStart = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), tz, start);
String basePath = location.getPath();
while (!end.before(alignedStart)) {
Properties allProperties = ExpressionHelper.getTimeVariables(alignedStart, tz);
allProperties.putAll(baseProperties);
String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties);
FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath));
FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath);
Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz);
instance.setInstance(SchemaHelper.formatDateUTC(date));
if (fileStatus != null) {
instance.setCreationTime(fileStatus.getModificationTime());
ContentSummary contentSummary = fileSystem.getContentSummary(fileStatus.getPath());
if (contentSummary != null) {
long size = contentSummary.getSpaceConsumed();
instance.setSize(size);
if (!StringUtils.isEmpty(feed.getAvailabilityFlag())) {
FileStatus doneFile = getFileStatus(fileSystem,
new Path(fileStatus.getPath(), feed.getAvailabilityFlag()));
if (doneFile != null) {
instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
} else {
instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
}
} else {
instance.setStatus(size > 0 ? FeedInstanceStatus.AvailabilityStatus.AVAILABLE
: FeedInstanceStatus.AvailabilityStatus.EMPTY);
}
}
}
instances.add(instance);
calendar.setTime(alignedStart);
calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
feed.getFrequency().getFrequencyAsInt());
alignedStart = calendar.getTime();
}
return instances;
} catch (IOException e) {
LOG.error("Unable to retrieve listing for {}:{}", locationType, getStorageUrl(), e);
throw new FalconException("Unable to retrieve listing for (URI " + getStorageUrl() + ")", e);
}
}
@Override
public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
LocationType locationType,
Date instanceTime) throws FalconException {
List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
if (result.isEmpty()) {
return FeedInstanceStatus.AvailabilityStatus.MISSING;
} else {
return result.get(0).getStatus();
}
}
public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {
FileStatus fileStatus = null;
try {
fileStatus = fileSystem.getFileStatus(feedInstancePath);
} catch (IOException ignore) {
//ignore
}
return fileStatus;
}
public Configuration getConf() {
Configuration conf = new Configuration();
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
return conf;
}
private String getRelativePath(Location location) {
// if the path contains variables, locate on the "parent" path (just before first variable usage)
Matcher matcher = FeedDataPath.PATTERN.matcher(location.getPath());
boolean timedPath = matcher.find();
if (timedPath) {
return location.getPath().substring(0, matcher.start());
}
return location.getPath();
}
@Override
public String toString() {
return "FileSystemStorage{"
+ "storageUrl='" + storageUrl + '\''
+ ", locations=" + locations
+ '}';
}
}