blob: 31cf53c98c9afe58284da8962056747a04e357cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.volume;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class VolumeConfiguration {
@SuppressWarnings("deprecation")
private static final Property INSTANCE_DFS_DIR = Property.INSTANCE_DFS_DIR;
@SuppressWarnings("deprecation")
private static final Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
public static Volume getVolume(String path, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
requireNonNull(path);
if (path.contains(":")) {
// An absolute path
return create(new Path(path), conf);
} else {
// A relative path
return getDefaultVolume(conf, acuconf);
}
}
public static Volume getDefaultVolume(Configuration conf, AccumuloConfiguration acuconf) throws IOException {
String uri = acuconf.get(INSTANCE_DFS_URI);
// By default pull from INSTANCE_DFS_URI, falling back to the Hadoop defined
// default filesystem (fs.defaultFS or the deprecated fs.default.name)
if ("".equals(uri))
return create(FileSystem.get(conf), acuconf);
else
try {
return create(FileSystem.get(new URI(uri), conf), acuconf);
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
/**
* This method gets the old configured base directory, using the URI and DIR. It will not longer be needed when we no longer support upgrading from non-volume
* based Accumulo config
*
* @see #getVolumeUris(AccumuloConfiguration,Configuration)
*/
private static String getConfiguredBaseDir(AccumuloConfiguration conf, Configuration hadoopConfig) {
String singleNamespace = conf.get(INSTANCE_DFS_DIR);
String dfsUri = conf.get(INSTANCE_DFS_URI);
String baseDir;
if (dfsUri == null || dfsUri.isEmpty()) {
try {
baseDir = FileSystem.get(hadoopConfig).getUri().toString() + singleNamespace;
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
if (!dfsUri.contains(":"))
throw new IllegalArgumentException("Expected fully qualified URI for " + INSTANCE_DFS_URI.getKey() + " got " + dfsUri);
baseDir = dfsUri + singleNamespace;
}
return baseDir;
}
/**
* Compute the URIs to be used by Accumulo
*
*/
public static String[] getVolumeUris(AccumuloConfiguration conf) {
return getVolumeUris(conf, CachedConfiguration.getInstance());
}
public static String[] getVolumeUris(AccumuloConfiguration conf, Configuration hadoopConfig) {
String ns = conf.get(Property.INSTANCE_VOLUMES);
String configuredBaseDirs[];
if (ns == null || ns.isEmpty()) {
// Fall back to using the old config values
configuredBaseDirs = new String[] {getConfiguredBaseDir(conf, hadoopConfig)};
} else {
String namespaces[] = ns.split(",");
configuredBaseDirs = new String[namespaces.length];
int i = 0;
for (String namespace : namespaces) {
if (!namespace.contains(":")) {
throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
}
try {
// pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char)
configuredBaseDirs[i++] = new Path(new URI(namespace)).toString();
} catch (URISyntaxException e) {
throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains " + namespace + " which has a syntax error", e);
}
}
}
return configuredBaseDirs;
}
public static String[] prefix(String bases[], String suffix) {
if (suffix.startsWith("/"))
suffix = suffix.substring(1);
String result[] = new String[bases.length];
for (int i = 0; i < bases.length; i++) {
if (bases[i].endsWith("/")) {
result[i] = bases[i] + suffix;
} else {
result[i] = bases[i] + "/" + suffix;
}
}
return result;
}
/**
* Create a Volume with the given FileSystem that writes to the default path
*
* @param fs
* A FileSystem to write to
* @return A Volume instance writing to the given FileSystem in the default path
*/
public static <T extends FileSystem> Volume create(T fs, AccumuloConfiguration acuconf) {
String dfsDir = acuconf.get(INSTANCE_DFS_DIR);
return new VolumeImpl(fs, null == dfsDir ? INSTANCE_DFS_DIR.getDefaultValue() : dfsDir);
}
public static <T extends FileSystem> Volume create(T fs, String basePath) {
return new VolumeImpl(fs, basePath);
}
public static Volume create(Path path, Configuration conf) throws IOException {
return new VolumeImpl(path, conf);
}
}