blob: cc8dccaf1aeefb90fba9240a98d0cfaff55d047a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.*;
import java.net.*;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.List;
import java.lang.Math;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.collect.Lists;
/**
* This class provides fetching a specified file from the NameNode.
*/
class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
static MD5Hash downloadImageToStorage(
String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
throws IOException {
String fileid = GetImageServlet.getParamStringForImage(
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(
NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
dstFiles.get(0).length() + " bytes.");
return hash;
}
static void downloadEditsToStorage(String fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
String fileid = GetImageServlet.getParamStringForLog(
log, dstStorage);
String fileName = NNStorage.getFinalizedEditsFileName(
log.getStartTxId(), log.getEndTxId());
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.EDITS, fileName);
assert !dstFiles.isEmpty() : "No checkpoint targets.";
for (File f : dstFiles) {
if (f.exists() && f.canRead()) {
LOG.info("Skipping download of remote edit log " +
log + " since it already is stored locally at " + f);
return;
} else {
LOG.debug("Dest file: " + f);
}
}
getFileClient(fsName, fileid, dstFiles, dstStorage, false);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
dstFiles.get(0).length() + " bytes.");
}
/**
* Requests that the NameNode download an image from this node.
*
* @param fsName the http address for the remote NN
* @param imageListenAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param storage the storage directory to transfer the image from
* @param txid the transaction ID of the image to be uploaded
*/
static void uploadImageFromStorage(String fsName,
InetSocketAddress imageListenAddress,
NNStorage storage, long txid) throws IOException {
String fileid = GetImageServlet.getParamStringToPutImage(
txid, imageListenAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
}
/**
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
static void getFileServer(OutputStream outstream, File localfile,
DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
FileInputStream infile = null;
try {
infile = new FileInputStream(localfile);
if (ErrorSimulator.getErrorSimulation(2)
&& localfile.getAbsolutePath().contains("secondary")) {
// throw exception only when the secondary sends its image
throw new IOException("If this exception is not caught by the " +
"name-node fs image will be truncated.");
}
if (ErrorSimulator.getErrorSimulation(3)
&& localfile.getAbsolutePath().contains("fsimage")) {
// Test sending image shorter than localfile
long len = localfile.length();
buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)];
// This will read at most half of the image
// and the rest of the image will be sent over the wire
infile.read(buf);
}
int num = 1;
while (num > 0) {
num = infile.read(buf);
if (num <= 0) {
break;
}
if (ErrorSimulator.getErrorSimulation(4)) {
// Simulate a corrupted byte on the wire
LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
buf[0]++;
}
outstream.write(buf, 0, num);
if (throttler != null) {
throttler.throttle(num);
}
}
} finally {
if (infile != null) {
infile.close();
}
}
}
/**
* Client-side Method to fetch file from a server
* Copies the response from the URL to a list of local files.
* @param dstStorage if an error occurs writing to one of the files,
* this storage object will be notified.
* @Return a digest of the received file if getChecksum is true
*/
static MD5Hash getFileClient(String nnHostPort,
String queryString, List<File> localPaths,
NNStorage dstStorage, boolean getChecksum) throws IOException {
byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
String proto = UserGroupInformation.isSecurityEnabled() ? "https://" : "http://";
StringBuilder str = new StringBuilder(proto+nnHostPort+"/getimage?");
str.append(queryString);
//
// open connection to remote server
//
URL url = new URL(str.toString());
// Avoid Krb bug with cross-realm hosts
SecurityUtil.fetchServiceTicket(url);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException(
"Image transfer servlet at " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage());
}
long advertisedSize;
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
if (contentLength != null) {
advertisedSize = Long.parseLong(contentLength);
} else {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + str);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
InputStream stream = connection.getInputStream();
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
stream = new DigestInputStream(stream, digester);
}
boolean finishedReceiving = false;
List<FileOutputStream> outputStreams = Lists.newArrayList();
try {
if (localPaths != null) {
for (File f : localPaths) {
try {
if (f.exists()) {
LOG.warn("Overwriting existing file " + f
+ " with file downloaded from " + str);
}
outputStreams.add(new FileOutputStream(f));
} catch (IOException ioe) {
LOG.warn("Unable to download file " + f, ioe);
dstStorage.reportErrorOnFile(f);
}
}
if (outputStreams.isEmpty()) {
throw new IOException(
"Unable to download to any storage directory");
}
}
int num = 1;
while (num > 0) {
num = stream.read(buf);
if (num > 0) {
received += num;
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
}
}
finishedReceiving = true;
} finally {
stream.close();
for (FileOutputStream fos : outputStreams) {
fos.getChannel().force(true);
fos.close();
}
if (finishedReceiving && received != advertisedSize) {
// only throw this exception if we think we read all of it on our end
// -- otherwise a client-side IOException would be masked by this
// exception that makes it look like a server-side problem!
throw new IOException("File " + str + " received length " + received +
" is not of the advertised size " +
advertisedSize);
}
}
if (digester != null) {
MD5Hash computedDigest = new MD5Hash(digester.digest());
if (advertisedDigest != null &&
!computedDigest.equals(advertisedDigest)) {
throw new IOException("File " + str + " computed digest " +
computedDigest + " does not match advertised digest " +
advertisedDigest);
}
return computedDigest;
} else {
return null;
}
}
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
}