blob: cfff1bdcbd2b17a42c10f4f52944006c37496a1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.impl;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Filesystem-based implementation of RegistryOperations. This class relies
* entirely on the configured FS for security and does no extra checks.
*/
public class FSRegistryOperationsService extends CompositeService
implements RegistryOperations {
private FileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(FSRegistryOperationsService.class);
private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal =
new RegistryUtils.ServiceRecordMarshal();
public FSRegistryOperationsService() {
super(FSRegistryOperationsService.class.getName());
}
@VisibleForTesting
public FileSystem getFs() {
return this.fs;
}
@Override
protected void serviceInit(Configuration conf) {
try {
this.fs = FileSystem.get(conf);
LOG.info("Initialized Yarn-registry with Filesystem "
+ fs.getClass().getCanonicalName());
} catch (IOException e) {
LOG.error("Failed to get FileSystem for registry", e);
throw new YarnRuntimeException(e);
}
}
private Path makePath(String path) {
return new Path(path);
}
private Path formatDataPath(String basePath) {
return Path.mergePaths(new Path(basePath), new Path("/_record"));
}
private String relativize(String basePath, String childPath) {
String relative = new File(basePath).toURI()
.relativize(new File(childPath).toURI()).getPath();
return relative;
}
@Override
public boolean mknode(String path, boolean createParents)
throws PathNotFoundException, InvalidPathnameException, IOException {
Path registryPath = makePath(path);
// getFileStatus throws FileNotFound if the path doesn't exist. If the
// file already exists, return.
try {
fs.getFileStatus(registryPath);
return false;
} catch (FileNotFoundException e) {
}
if (createParents) {
// By default, mkdirs creates any parent dirs it needs
fs.mkdirs(registryPath);
} else {
FileStatus parentStatus = null;
if (registryPath.getParent() != null) {
parentStatus = fs.getFileStatus(registryPath.getParent());
}
if (registryPath.getParent() == null || parentStatus.isDirectory()) {
fs.mkdirs(registryPath);
} else {
throw new PathNotFoundException("no parent for " + path);
}
}
return true;
}
@Override
public void bind(String path, ServiceRecord record, int flags)
throws PathNotFoundException, FileAlreadyExistsException,
InvalidPathnameException, IOException {
// Preserve same overwrite semantics as ZK implementation
Preconditions.checkArgument(record != null, "null record");
RegistryTypeUtils.validateServiceRecord(path, record);
Path dataPath = formatDataPath(path);
Boolean overwrite = ((flags & BindFlags.OVERWRITE) != 0);
if (fs.exists(dataPath) && !overwrite) {
throw new FileAlreadyExistsException();
} else {
// Either the file doesn't exist, or it exists and we're
// overwriting. Create overwrites by default and creates parent dirs if
// needed.
FSDataOutputStream stream = fs.create(dataPath);
byte[] bytes = serviceRecordMarshal.toBytes(record);
stream.write(bytes);
stream.close();
LOG.info("Bound record to path " + dataPath);
}
}
@Override
public ServiceRecord resolve(String path) throws PathNotFoundException,
NoRecordException, InvalidRecordException, IOException {
// Read the entire file into byte array, should be small metadata
Long size = fs.getFileStatus(formatDataPath(path)).getLen();
byte[] bytes = new byte[size.intValue()];
FSDataInputStream instream = fs.open(formatDataPath(path));
int bytesRead = instream.read(bytes);
instream.close();
if (bytesRead < size) {
throw new InvalidRecordException(path,
"Expected " + size + " bytes, but read " + bytesRead);
}
// Unmarshal, check, and return
ServiceRecord record = serviceRecordMarshal.fromBytes(path, bytes);
RegistryTypeUtils.validateServiceRecord(path, record);
return record;
}
@Override
public RegistryPathStatus stat(String path)
throws PathNotFoundException, InvalidPathnameException, IOException {
FileStatus fstat = fs.getFileStatus(formatDataPath(path));
int numChildren = fs.listStatus(makePath(path)).length;
RegistryPathStatus regstat =
new RegistryPathStatus(fstat.getPath().toString(),
fstat.getModificationTime(), fstat.getLen(), numChildren);
return regstat;
}
@Override
public boolean exists(String path) throws IOException {
return fs.exists(makePath(path));
}
@Override
public List<String> list(String path)
throws PathNotFoundException, InvalidPathnameException, IOException {
FileStatus[] statArray = fs.listStatus(makePath(path));
String basePath = fs.getFileStatus(makePath(path)).getPath().toString();
List<String> paths = new ArrayList<String>();
FileStatus stat;
// Only count dirs; the _record files are hidden.
for (int i = 0; i < statArray.length; i++) {
stat = statArray[i];
if (stat.isDirectory()) {
String relativePath = relativize(basePath, stat.getPath().toString());
paths.add(relativePath);
}
}
return paths;
}
@Override
public void delete(String path, boolean recursive)
throws PathNotFoundException, PathIsNotEmptyDirectoryException,
InvalidPathnameException, IOException {
Path dirPath = makePath(path);
if (!fs.exists(dirPath)) {
throw new PathNotFoundException(path);
}
// If recursive == true, or dir is empty, delete.
if (recursive || list(path).isEmpty()) {
fs.delete(makePath(path), true);
return;
}
throw new PathIsNotEmptyDirectoryException(path);
}
@Override
public boolean addWriteAccessor(String id, String pass) throws IOException {
throw new NotImplementedException();
}
@Override
public void clearWriteAccessors() {
throw new NotImplementedException();
}
}