blob: fec7a00a923aea770f97e0f84a6eed16fc6a57f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.accumulo.server.ServerConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes.
* This also concentrates a bunch of meta-operations like waiting for SAFE_MODE, and closing WALs.
* N.B. implementations must be thread safe.
*/
public interface VolumeManager extends AutoCloseable {
enum FileType {
TABLE(ServerConstants.TABLE_DIR),
WAL(ServerConstants.WAL_DIR),
RECOVERY(ServerConstants.RECOVERY_DIR);
private String dir;
FileType(String dir) {
this.dir = dir;
}
public String getDirectory() {
return dir;
}
private static int endOfVolumeIndex(String path, String dir) {
// Strip off the suffix that starts with the FileType (e.g. tables, wal, etc)
int dirIndex = path.indexOf('/' + dir);
if (dirIndex != -1) {
return dirIndex;
}
if (path.contains(":"))
throw new IllegalArgumentException(path + " is absolute, but does not contain " + dir);
return -1;
}
public Path getVolume(Path path) {
String pathString = path.toString();
int eopi = endOfVolumeIndex(pathString, dir);
if (eopi != -1)
return new Path(pathString.substring(0, eopi + 1));
return null;
}
public Path removeVolume(Path path) {
String pathString = path.toString();
int eopi = endOfVolumeIndex(pathString, dir);
if (eopi != -1)
return new Path(pathString.substring(eopi + 1));
return null;
}
}
// close the underlying FileSystems
@Override
void close() throws IOException;
// forward to the appropriate FileSystem object
FSDataOutputStream create(Path dest) throws IOException;
// forward to the appropriate FileSystem object's create method with overwrite flag set to true
FSDataOutputStream overwrite(Path path) throws IOException;
// forward to the appropriate FileSystem object
FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1)
throws IOException;
// create a file, but only if it doesn't exist
boolean createNewFile(Path writable) throws IOException;
// create a file which can be sync'd to disk
FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize)
throws IOException;
// delete a file
boolean delete(Path path) throws IOException;
// delete a directory and anything under it
boolean deleteRecursively(Path path) throws IOException;
// forward to the appropriate FileSystem object
boolean exists(Path path) throws IOException;
// forward to the appropriate FileSystem object
FileStatus getFileStatus(Path path) throws IOException;
// find the appropriate FileSystem object given a path
FileSystem getFileSystemByPath(Path path);
// return the item in options that is in the same file system as source
Path matchingFileSystem(Path source, Set<String> options);
// forward to the appropriate FileSystem object
FileStatus[] listStatus(Path path) throws IOException;
// forward to the appropriate FileSystem object
boolean mkdirs(Path directory) throws IOException;
// forward to the appropriate FileSystem object
boolean mkdirs(Path path, FsPermission permission) throws IOException;
// forward to the appropriate FileSystem object
FSDataInputStream open(Path path) throws IOException;
// forward to the appropriate FileSystem object, throws an exception if the paths are in different
// volumes
boolean rename(Path path, Path newPath) throws IOException;
/**
* Rename lots of files at once in a thread pool and return once all the threads have completed.
* This operation should be idempotent to allow calling multiple times in the case of a partial
* completion.
*/
void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName,
String transactionId) throws IOException;
// forward to the appropriate FileSystem object
boolean moveToTrash(Path sourcePath) throws IOException;
// forward to the appropriate FileSystem object
short getDefaultReplication(Path logPath);
// all volume are ready to provide service (not in SafeMode, for example)
boolean isReady() throws IOException;
// forward to the appropriate FileSystem object
FileStatus[] globStatus(Path path) throws IOException;
// decide on which of the given locations to create a new file
String choose(org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment env, Set<String> options);
// return all valid locations to create a new file
Set<String> choosable(org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment env,
Set<String> options);
// are sync and flush supported for the given path
boolean canSyncAndFlush(Path path);
/**
* Fetch the first configured instance Volume
*/
default Volume getFirst() {
return getVolumes().iterator().next();
}
/**
* Fetch the configured instance Volumes
*/
Collection<Volume> getVolumes();
Logger log = LoggerFactory.getLogger(VolumeManager.class);
static String getInstanceIDFromHdfs(Path instanceDirectory, Configuration hadoopConf) {
try {
FileSystem fs =
VolumeConfiguration.fileSystemForPath(instanceDirectory.toString(), hadoopConf);
FileStatus[] files = null;
try {
files = fs.listStatus(instanceDirectory);
} catch (FileNotFoundException ex) {
// ignored
}
log.debug("Trying to read instance id from {}", instanceDirectory);
if (files == null || files.length == 0) {
log.error("unable to obtain instance id at {}", instanceDirectory);
throw new RuntimeException(
"Accumulo not initialized, there is no instance id at " + instanceDirectory);
} else if (files.length != 1) {
log.error("multiple potential instances in {}", instanceDirectory);
throw new RuntimeException(
"Accumulo found multiple possible instance ids in " + instanceDirectory);
} else {
return files[0].getPath().getName();
}
} catch (IOException e) {
log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
throw new RuntimeException(
"Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory,
e);
} catch (IllegalArgumentException exception) {
/* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
if (exception.getCause() instanceof UnknownHostException) {
log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
}
throw exception;
}
}
}