blob: f9bee5400d5071b32bcca2be022955d39ac7222a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Basic Implementation of the OzoneFileSystem calls.
* <p>
* This is the minimal version which doesn't include any statistics.
* <p>
* For full featured version use OzoneClientAdapterImpl.
*/
public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
static final Logger LOG =
LoggerFactory.getLogger(BasicOzoneClientAdapterImpl.class);
private OzoneClient ozoneClient;
private ObjectStore objectStore;
private OzoneVolume volume;
private OzoneBucket bucket;
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private boolean securityEnabled;
private int configuredDnPort;
/**
* Create new OzoneClientAdapter implementation.
*
* @param volumeStr Name of the volume to use.
* @param bucketStr Name of the bucket to use
* @throws IOException In case of a problem.
*/
public BasicOzoneClientAdapterImpl(String volumeStr, String bucketStr)
throws IOException {
this(new OzoneConfiguration(), volumeStr, bucketStr);
}
public BasicOzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
String bucketStr)
throws IOException {
this(null, -1, conf, volumeStr, bucketStr);
}
public BasicOzoneClientAdapterImpl(String omHost, int omPort,
ConfigurationSource hadoopConf, String volumeStr, String bucketStr)
throws IOException {
OzoneConfiguration conf = OzoneConfiguration.of(hadoopConf);
if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
// When the host name or service id isn't given
// but ozone.om.service.ids is defined, declare failure.
// This is a safety precaution that prevents the client from
// accidentally failing over to an unintended OM.
throw new IllegalArgumentException("Service ID or host name must not"
+ " be omitted when ozone.om.service.ids is defined.");
}
if (omPort != -1) {
// When the port number is specified, perform the following check
if (OmUtils.isOmHAServiceId(conf, omHost)) {
// If omHost is a service id, it shouldn't use a port
throw new IllegalArgumentException("Port " + omPort +
" specified in URI but host '" + omHost + "' is a "
+ "logical (HA) OzoneManager and does not use port information.");
}
} else {
// When port number is not specified, read it from config
omPort = OmUtils.getOmRpcPort(conf);
}
SecurityConfig secConfig = new SecurityConfig(conf);
if (secConfig.isSecurityEnabled()) {
this.securityEnabled = true;
}
String replicationTypeConf =
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
if (OmUtils.isOmHAServiceId(conf, omHost)) {
// omHost is listed as one of the service ids in the config,
// thus we should treat omHost as omServiceId
this.ozoneClient =
OzoneClientFactory.getRpcClient(omHost, conf);
} else if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
this.ozoneClient =
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
} else {
this.ozoneClient =
OzoneClientFactory.getRpcClient(conf);
}
objectStore = ozoneClient.getObjectStore();
this.volume = objectStore.getVolume(volumeStr);
this.bucket = volume.getBucket(bucketStr);
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
this.configuredDnPort = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
@Override
public short getDefaultReplication() {
return (short) replicationFactor.getValue();
}
@Override
public void close() throws IOException {
ozoneClient.close();
}
@Override
public InputStream readFile(String key) throws IOException {
incrementCounter(Statistic.OBJECTS_READ, 1);
try {
return bucket.readFile(key).getInputStream();
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_NOT_FOUND
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileNotFoundException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
protected void incrementCounter(Statistic objectsRead, long count) {
//noop: Use OzoneClientAdapterImpl which supports statistics.
}
@Override
public OzoneFSOutputStream createFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED, 1);
try {
OzoneOutputStream ozoneOutputStream = null;
if (replication == ReplicationFactor.ONE.getValue()
|| replication == ReplicationFactor.THREE.getValue()) {
ReplicationFactor clientReplication = ReplicationFactor
.valueOf(replication);
ozoneOutputStream = bucket.createFile(key, 0, replicationType,
clientReplication, overWrite, recursive);
} else {
ozoneOutputStream = bucket.createFile(key, 0, replicationType,
replicationFactor, overWrite, recursive);
}
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileAlreadyExistsException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
@Override
public void renameKey(String key, String newKeyName) throws IOException {
incrementCounter(Statistic.OBJECTS_RENAMED, 1);
bucket.renameKey(key, newKeyName);
}
@Override
public void rename(String pathStr, String newPath) throws IOException {
throw new IOException("Please use renameKey instead for o3fs.");
}
/**
* Helper method to create an directory specified by key name in bucket.
*
* @param keyName key name to be created as directory
* @return true if the key is created, false otherwise
*/
@Override
public boolean createDirectory(String keyName) throws IOException {
LOG.trace("creating dir for key:{}", keyName);
incrementCounter(Statistic.OBJECTS_CREATED, 1);
try {
bucket.createDirectory(keyName);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS) {
throw new FileAlreadyExistsException(e.getMessage());
}
throw e;
}
return true;
}
/**
* Helper method to delete an object specified by key name in bucket.
*
* @param keyName key name to be deleted
* @return true if the key is deleted, false otherwise
*/
@Override
public boolean deleteObject(String keyName) {
LOG.trace("issuing delete for key {}", keyName);
try {
incrementCounter(Statistic.OBJECTS_DELETED, 1);
bucket.deleteKey(keyName);
return true;
} catch (IOException ioe) {
LOG.error("delete key failed {}", ioe.getMessage());
return false;
}
}
/**
* Helper method to delete an object specified by key name in bucket.
*
* @param keyNameList key name list to be deleted
* @return true if the key is deleted, false otherwise
*/
@Override
public boolean deleteObjects(List<String> keyNameList) {
try {
incrementCounter(Statistic.OBJECTS_DELETED, keyNameList.size());
bucket.deleteKeys(keyNameList);
return true;
} catch (IOException ioe) {
LOG.error("delete key failed {}", ioe.getMessage());
return false;
}
}
public FileStatusAdapter getFileStatus(String key, URI uri,
Path qualifiedPath, String userName)
throws IOException {
try {
incrementCounter(Statistic.OBJECTS_QUERY, 1);
OzoneFileStatus status = bucket.getFileStatus(key);
return toFileStatusAdapter(status, userName, uri, qualifiedPath);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
throw new
FileNotFoundException(key + ": No such file or directory!");
}
throw e;
}
}
@Override
public Iterator<BasicKeyInfo> listKeys(String pathKey) throws IOException{
incrementCounter(Statistic.OBJECTS_LIST, 1);
return new IteratorAdapter(bucket.listKeys(pathKey));
}
public List<FileStatusAdapter> listStatus(String keyName, boolean recursive,
String startKey, long numEntries, URI uri,
Path workingDir, String username) throws IOException {
try {
incrementCounter(Statistic.OBJECTS_LIST, 1);
List<OzoneFileStatus> statuses = bucket
.listStatus(keyName, recursive, startKey, numEntries);
List<FileStatusAdapter> result = new ArrayList<>();
for (OzoneFileStatus status : statuses) {
result.add(toFileStatusAdapter(status, username, uri, workingDir));
}
return result;
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
}
throw e;
}
}
@Override
public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
throws IOException {
if (!securityEnabled) {
return null;
}
Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
.getDelegationToken(renewer == null ? null : new Text(renewer));
token.setKind(OzoneTokenIdentifier.KIND_NAME);
return token;
}
@Override
public KeyProvider getKeyProvider() throws IOException {
return objectStore.getKeyProvider();
}
@Override
public URI getKeyProviderUri() throws IOException {
return objectStore.getKeyProviderUri();
}
@Override
public String getCanonicalServiceName() {
return objectStore.getCanonicalServiceName();
}
/**
* Ozone Delegation Token Renewer.
*/
@InterfaceAudience.Private
public static class Renewer extends TokenRenewer {
//Ensure that OzoneConfiguration files are loaded before trying to use
// the renewer.
static {
OzoneConfiguration.activate();
}
public Text getKind() {
return OzoneTokenIdentifier.KIND_NAME;
}
@Override
public boolean handleKind(Text kind) {
return getKind().equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getOzoneClient(OzoneConfiguration.of(conf),
ozoneDt);
return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
}
@Override
public void cancel(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getOzoneClient(OzoneConfiguration.of(conf),
ozoneDt);
ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
}
}
/**
* Adapter to convert OzoneKey to a safe and simple Key implementation.
*/
public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
private Iterator<? extends OzoneKey> original;
public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
this.original = listKeys;
}
@Override
public boolean hasNext() {
return original.hasNext();
}
@Override
public BasicKeyInfo next() {
OzoneKey next = original.next();
if (next == null) {
return null;
} else {
return new BasicKeyInfo(
next.getName(),
next.getModificationTime().toEpochMilli(),
next.getDataSize()
);
}
}
}
private FileStatusAdapter toFileStatusAdapter(OzoneFileStatus status,
String owner, URI defaultUri, Path workingDir) {
OmKeyInfo keyInfo = status.getKeyInfo();
short replication = (short) keyInfo.getFactor().getNumber();
return new FileStatusAdapter(
keyInfo.getDataSize(),
new Path(OZONE_URI_DELIMITER + keyInfo.getKeyName())
.makeQualified(defaultUri, workingDir),
status.isDirectory(),
replication,
status.getBlockSize(),
keyInfo.getModificationTime(),
keyInfo.getModificationTime(),
status.isDirectory() ? (short) 00777 : (short) 00666,
owner,
owner,
null,
getBlockLocations(status)
);
}
/**
* Helper method to get List of BlockLocation from OM Key info.
* @param fileStatus Ozone key file status.
* @return list of block locations.
*/
private BlockLocation[] getBlockLocations(OzoneFileStatus fileStatus) {
if (fileStatus == null) {
return new BlockLocation[0];
}
OmKeyInfo keyInfo = fileStatus.getKeyInfo();
if (keyInfo == null || CollectionUtils.isEmpty(
keyInfo.getKeyLocationVersions())) {
return new BlockLocation[0];
}
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
keyInfo.getKeyLocationVersions();
if (CollectionUtils.isEmpty(omKeyLocationInfoGroups)) {
return new BlockLocation[0];
}
OmKeyLocationInfoGroup omKeyLocationInfoGroup =
keyInfo.getLatestVersionLocations();
BlockLocation[] blockLocations = new BlockLocation[
omKeyLocationInfoGroup.getBlocksLatestVersionOnly().size()];
int i = 0;
long offsetOfBlockInFile = 0L;
for (OmKeyLocationInfo omKeyLocationInfo :
omKeyLocationInfoGroup.getBlocksLatestVersionOnly()) {
List<String> hostList = new ArrayList<>();
List<String> nameList = new ArrayList<>();
omKeyLocationInfo.getPipeline().getNodes()
.forEach(dn -> {
hostList.add(dn.getHostName());
int port = dn.getPort(
DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = configuredDnPort;
}
nameList.add(dn.getHostName() + ":" + port);
});
String[] hosts = hostList.toArray(new String[hostList.size()]);
String[] names = nameList.toArray(new String[nameList.size()]);
BlockLocation blockLocation = new BlockLocation(
names, hosts, offsetOfBlockInFile,
omKeyLocationInfo.getLength());
offsetOfBlockInFile += omKeyLocationInfo.getLength();
blockLocations[i++] = blockLocation;
}
return blockLocations;
}
}