blob: 8f7588f2b91ef4269241cfe7f358583c8a56bc76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@InterfaceAudience.Private
public final class Util {
private final static Log LOG = LogFactory.getLog(Util.class.getName());
/* Required headers for FSImage transfer. */
public final static String FILE_LENGTH = "File-Length";
public final static String CONTENT_LENGTH = "Content-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
public final static String CONTENT_TYPE = "Content-Type";
public final static String CONTENT_TRANSFER_ENCODING =
"Content-Transfer-Encoding";
public final static int IO_FILE_BUFFER_SIZE;
private static final boolean isSpnegoEnabled;
public static final URLConnectionFactory connectionFactory;
static {
Configuration conf = new Configuration();
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
}
/**
* Interprets the passed string as a URI. In case of error it
* assumes the specified string is a file.
*
* @param s the string to interpret
* @return the resulting URI
*/
static URI stringAsURI(String s) throws IOException {
URI u = null;
// try to make a URI
try {
u = new URI(s);
} catch (URISyntaxException e){
LOG.error("Syntax error in URI " + s
+ ". Please check hdfs configuration.", e);
}
// if URI is null or scheme is undefined, then assume it's file://
if(u == null || u.getScheme() == null){
LOG.info("Assuming 'file' scheme for path " + s + " in configuration.");
u = fileAsURI(new File(s));
}
return u;
}
/**
* Converts the passed File to a URI. This method trims the trailing slash if
* one is appended because the underlying file is in fact a directory that
* exists.
*
* @param f the file to convert
* @return the resulting URI
*/
public static URI fileAsURI(File f) throws IOException {
URI u = f.getCanonicalFile().toURI();
// trim the trailing slash, if it's present
if (u.getPath().endsWith("/")) {
String uriAsString = u.toString();
try {
u = new URI(uriAsString.substring(0, uriAsString.length() - 1));
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
return u;
}
/**
* Converts a collection of strings into a collection of URIs.
* @param names collection of strings to convert to URIs
* @return collection of URIs
*/
public static List<URI> stringCollectionAsURIs(
Collection<String> names) {
List<URI> uris = new ArrayList<>(names.size());
for(String name : names) {
try {
uris.add(stringAsURI(name));
} catch (IOException e) {
LOG.error("Error while processing URI: " + name, e);
}
}
return uris;
}
public static boolean isDiskStatsEnabled(int fileIOSamplingPercentage) {
final boolean isEnabled;
if (fileIOSamplingPercentage <= 0) {
LOG.info(DFSConfigKeys
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY + " set to "
+ fileIOSamplingPercentage + ". Disabling file IO profiling");
isEnabled = false;
} else {
LOG.info(DFSConfigKeys
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY + " set to "
+ fileIOSamplingPercentage + ". Enabling file IO profiling");
isEnabled = true;
}
return isEnabled;
}
/**
* Downloads the files at the specified url location into destination
* storage.
*/
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
setTimeout(connection, timeout);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException("Image transfer servlet at " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage(),
connection);
}
long advertisedSize;
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
if (contentLength != null) {
advertisedSize = Long.parseLong(contentLength);
} else {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
/**
* Receives file at the url location from the input stream and puts them in
* the specified destination storage location.
*/
public static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws
IOException {
long startTime = Time.monotonicNow();
Map<FileOutputStream, File> streamPathMap = new HashMap<>();
StringBuilder xferStats = new StringBuilder();
double xferCombined = 0;
if (localPaths != null) {
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<>();
for (File localPath : localPaths) {
if (localPath.isDirectory()) {
if (fsImageName == null) {
throw new IOException("No filename header provided by server");
}
newLocalPaths.add(new File(localPath, fsImageName));
} else {
newLocalPaths.add(localPath);
}
}
localPaths = newLocalPaths;
}
long received = 0;
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
stream = new DigestInputStream(stream, digester);
}
boolean finishedReceiving = false;
List<FileOutputStream> outputStreams = Lists.newArrayList();
try {
if (localPaths != null) {
for (File f : localPaths) {
try {
if (f.exists()) {
LOG.warn("Overwriting existing file " + f
+ " with file downloaded from " + url);
}
FileOutputStream fos = new FileOutputStream(f);
outputStreams.add(fos);
streamPathMap.put(fos, f);
} catch (IOException ioe) {
LOG.warn("Unable to download file " + f, ioe);
// This will be null if we're downloading the fsimage to a file
// outside of an NNStorage directory.
if (dstStorage != null &&
(dstStorage instanceof StorageErrorReporter)) {
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
}
}
}
if (outputStreams.isEmpty()) {
throw new IOException(
"Unable to download to any storage directory");
}
}
int num = 1;
byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
while (num > 0) {
num = stream.read(buf);
if (num > 0) {
received += num;
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
double xferSec = Math.max(
((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
long xferKb = received / 1024;
xferCombined += xferSec;
xferStats.append(
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
xferSec, xferKb / xferSec));
} finally {
stream.close();
for (FileOutputStream fos : outputStreams) {
long flushStartTime = Time.monotonicNow();
fos.getChannel().force(true);
fos.close();
double writeSec = Math.max(((float)
(flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
xferCombined += writeSec;
xferStats.append(String
.format(" Synchronous (fsync) write to disk of " +
streamPathMap.get(fos).getAbsolutePath() +
" took %.2fs.", writeSec));
}
// Something went wrong and did not finish reading.
// Remove the temporary files.
if (!finishedReceiving) {
deleteTmpFiles(localPaths);
}
if (finishedReceiving && received != advertisedSize) {
// only throw this exception if we think we read all of it on our end
// -- otherwise a client-side IOException would be masked by this
// exception that makes it look like a server-side problem!
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " received length " + received +
" is not of the advertised size " +
advertisedSize);
}
}
xferStats.insert(0, String.format("Combined time for fsimage download and" +
" fsync to all disks took %.2fs.", xferCombined));
LOG.info(xferStats.toString());
if (digester != null) {
MD5Hash computedDigest = new MD5Hash(digester.digest());
if (advertisedDigest != null &&
!computedDigest.equals(advertisedDigest)) {
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " computed digest " +
computedDigest + " does not match advertised digest " +
advertisedDigest);
}
return computedDigest;
} else {
return null;
}
}
private static void deleteTmpFiles(List<File> files) {
if (files == null) {
return;
}
LOG.info("Deleting temporary files: " + files);
for (File file : files) {
if (!file.delete()) {
LOG.warn("Deleting " + file + " has failed");
}
}
}
/**
* Sets a timeout value in millisecods for the Http connection.
* @param connection the Http connection for which timeout needs to be set
* @param timeout value to be set as timeout in milliseconds
*/
public static void setTimeout(HttpURLConnection connection, int timeout) {
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
}