blob: 664813b6dba9870dd1cd46b5df64ffa6126a452b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
* A file system implementation of a feed storage.
*/
public class FileSystemStorage implements Storage {
public static final String FEED_PATH_SEP = "#";
public static final String LOCATION_TYPE_SEP = "=";
public static final String FILE_SYSTEM_URL = "${nameNode}";
private static final String DOLLAR_EXPR_START = "_D__START_";
private static final String EXPR_CLOSE = "_CLOSE_";
private final String storageUrl;
private final List<Location> locations;
protected FileSystemStorage(Feed feed) {
this(FILE_SYSTEM_URL, feed.getLocations());
}
protected FileSystemStorage(Cluster cluster, Feed feed) {
this(ClusterHelper.getStorageUrl(cluster), feed.getLocations());
}
protected FileSystemStorage(String storageUrl, Locations locations) {
this(storageUrl, locations.getLocations());
}
protected FileSystemStorage(String storageUrl, List<Location> locations) {
if (storageUrl == null || storageUrl.length() == 0) {
throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
}
if (locations == null || locations.size() == 0) {
throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
}
this.storageUrl = storageUrl;
this.locations = locations;
}
/**
* Create an instance from the URI Template that was generated using
* the getUriTemplate() method.
*
* @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
* @throws URISyntaxException
*/
protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
if (uriTemplate == null || uriTemplate.length() == 0) {
throw new IllegalArgumentException("URI template cannot be null or empty");
}
String rawStorageUrl = null;
List<Location> rawLocations = new ArrayList<Location>();
String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
for (String rawPath : feedLocs) {
String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
final String processed = typeAndPath[1].replaceAll("\\$\\{", DOLLAR_EXPR_START)
.replaceAll("}", EXPR_CLOSE);
URI uri = new URI(processed);
if (rawStorageUrl == null) {
rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
}
String path = uri.getPath();
final String finalPath = path.replaceAll(DOLLAR_EXPR_START, "\\$\\{")
.replaceAll(EXPR_CLOSE, "\\}");
Location location = new Location();
location.setPath(finalPath);
location.setType(LocationType.valueOf(typeAndPath[0]));
rawLocations.add(location);
}
this.storageUrl = rawStorageUrl;
this.locations = rawLocations;
}
@Override
public TYPE getType() {
return TYPE.FILESYSTEM;
}
public String getStorageUrl() {
return storageUrl;
}
public List<Location> getLocations() {
return locations;
}
@Override
public String getUriTemplate() {
String feedPathMask = getUriTemplate(LocationType.DATA);
String metaPathMask = getUriTemplate(LocationType.META);
String statsPathMask = getUriTemplate(LocationType.STATS);
String tmpPathMask = getUriTemplate(LocationType.TMP);
StringBuilder feedBasePaths = new StringBuilder();
feedBasePaths.append(LocationType.DATA.name())
.append(LOCATION_TYPE_SEP)
.append(feedPathMask);
if (metaPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.META.name())
.append(LOCATION_TYPE_SEP)
.append(metaPathMask);
}
if (statsPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.STATS.name())
.append(LOCATION_TYPE_SEP)
.append(statsPathMask);
}
if (tmpPathMask != null) {
feedBasePaths.append(FEED_PATH_SEP)
.append(LocationType.TMP.name())
.append(LOCATION_TYPE_SEP)
.append(tmpPathMask);
}
return feedBasePaths.toString();
}
@Override
public String getUriTemplate(LocationType locationType) {
Location locationForType = null;
for (Location location : locations) {
if (location.getType() == locationType) {
locationForType = location;
break;
}
}
if (locationForType == null) {
return "/tmp";
}
StringBuilder uriTemplate = new StringBuilder();
uriTemplate.append(storageUrl);
uriTemplate.append(locationForType.getPath());
return uriTemplate.toString();
}
@Override
public boolean exists() throws FalconException {
// Directories on FS will be created if they don't exist.
return true;
}
@Override
public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
final List<Location> fsStorageLocations = fsStorage.getLocations();
return getLocations().size() == fsStorageLocations.size()
&& getLocation(getLocations(), LocationType.DATA).getPath().equals(
getLocation(fsStorageLocations, LocationType.DATA).getPath())
&& getLocation(getLocations(), LocationType.META).getPath().equals(
getLocation(fsStorageLocations, LocationType.META).getPath())
&& getLocation(getLocations(), LocationType.STATS).getPath().equals(
getLocation(fsStorageLocations, LocationType.STATS).getPath())
&& getLocation(getLocations(), LocationType.TMP).getPath().equals(
getLocation(fsStorageLocations, LocationType.TMP).getPath());
}
private static Location getLocation(List<Location> locations, LocationType type) {
for (Location loc : locations) {
if (loc.getType() == type) {
return loc;
}
}
Location loc = new Location();
loc.setPath("/tmp");
loc.setType(type);
return loc;
}
@Override
public String toString() {
return "FileSystemStorage{"
+ "storageUrl='" + storageUrl + '\''
+ ", locations=" + locations
+ '}';
}
}