blob: e0ffdc4287f83e60c54cab6d8348d938e1136c34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.writer.localfs;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.util.CopySequenceFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
* This class is used by LocalWriter.java.
*
* The only role of this class is to move dataSink files
* from the local file system to the remote HDFS.
*
* Those 2 classes are using a blockingQueue to exchange
* information.
*
* This class will also take care of moving all existing
* done dataSink files (.done) and any dataSink file that
* has not been changed for at least (rotatePeriod+2min).
*
*/
public class LocalToRemoteHdfsMover extends Thread {
static Logger log = Logger.getLogger(LocalToRemoteHdfsMover.class);
private FileSystem remoteFs = null;
private FileSystem localFs = null;
private Configuration conf = null;
private String fsname = null;
private String localOutputDir = null;
private String remoteOutputDir = null;
private boolean exitIfHDFSNotavailable = false;
private BlockingQueue<String> fileQueue = null;
private volatile boolean isRunning = true;
public LocalToRemoteHdfsMover(BlockingQueue<String> fileQueue ,Configuration conf) {
this.fileQueue = fileQueue;
this.conf = conf;
this.setDaemon(true);
this.setName("LocalToRemoteHdfsMover");
this.start();
}
protected void init() throws Throwable {
// check if they've told us the file system to use
fsname = conf.get("writer.hdfs.filesystem");
if (fsname == null || fsname.equals("")) {
// otherwise try to get the filesystem from hadoop
fsname = conf.get("fs.defaultFS");
}
if (fsname == null) {
log.error("no filesystem name");
throw new RuntimeException("no filesystem");
}
log.info("remote fs name is " + fsname);
exitIfHDFSNotavailable = conf.getBoolean(
"localToRemoteHdfsMover.exitIfHDFSNotavailable", false);
remoteFs = FileSystem.get(new URI(fsname), conf);
if (remoteFs == null && exitIfHDFSNotavailable) {
log.error("can't connect to HDFS.");
throw new WriterException("can't connect to HDFS.");
}
localFs = FileSystem.getLocal(conf);
remoteOutputDir = conf.get("chukwaCollector.outputDir", "/chukwa/logs/");
if (!remoteOutputDir.endsWith("/")) {
remoteOutputDir += "/";
}
localOutputDir = conf.get("chukwaCollector.localOutputDir",
"/chukwa/datasink/");
if (!localOutputDir.endsWith("/")) {
localOutputDir += "/";
}
}
protected void moveFile(String filePath) throws Exception{
String remoteFilePath = filePath.substring(filePath.lastIndexOf("/")+1,filePath.lastIndexOf("."));
remoteFilePath = remoteOutputDir + remoteFilePath;
try {
Path pLocalPath = new Path(filePath);
Path pRemoteFilePath = new Path(remoteFilePath + ".chukwa");
remoteFs.copyFromLocalFile(false, true, pLocalPath, pRemoteFilePath);
Path pFinalRemoteFilePath = new Path(remoteFilePath + ".done");
if ( remoteFs.rename(pRemoteFilePath, pFinalRemoteFilePath)) {
localFs.delete(pLocalPath,false);
log.info("move done deleting from local: " + pLocalPath);
} else {
throw new RuntimeException("Cannot rename remote file, " + pRemoteFilePath + " to " + pFinalRemoteFilePath);
}
}catch(FileNotFoundException ex) {
log.debug("File not found: " + remoteFilePath);
//do nothing since if the file is no longer there it's
// because it has already been moved over by the cleanup task.
}
catch (Exception e) {
log.warn("Cannot copy to the remote HDFS",e);
throw e;
}
}
protected void cleanup() throws Exception{
try {
int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
1000 * 60 * 5);// defaults to 5 minutes
Path pLocalOutputDir = new Path(localOutputDir);
FileStatus[] files = localFs.listStatus(pLocalOutputDir);
String fileName = null;
for (FileStatus file: files) {
fileName = file.getPath().getName();
if (fileName.endsWith(".recover")) {
//.recover files indicate a previously failed copying attempt of .chukwa files
Path recoverPath= new Path(localOutputDir+fileName);
localFs.delete(recoverPath, false);
log.info("Deleted .recover file, " + localOutputDir + fileName);
} else if (fileName.endsWith(".recoverDone")) {
//.recoverDone files are valid sink files that have not been renamed to .done
// First, check if there are still any .chukwa files with the same name
String chukwaFileName= fileName.replace(".recoverDone", ".chukwa");
Boolean fileNotFound=true;
int i=0;
while (i<files.length && fileNotFound) {
String currentFileName = files[i].getPath().getName();
if (currentFileName.equals(chukwaFileName)){
//Remove the .chukwa file found as it has already been recovered
fileNotFound = false;
Path chukwaFilePath = new Path(localOutputDir+chukwaFileName);
localFs.delete(chukwaFilePath,false);
log.info(".recoverDone file exists, deleted duplicate .chukwa file, "
+ localOutputDir + fileName);
}
i++;
}
//Finally, rename .recoverDone file to .done
String doneFileName= fileName.replace(".recoverDone", ".done");
Path donePath= new Path(localOutputDir+doneFileName);
Path recoverDonePath= new Path(localOutputDir+fileName);
localFs.rename(recoverDonePath, donePath);
log.info("Renamed .recoverDone file to .done, "+ localOutputDir + fileName);
} else if (fileName.endsWith(".done")) {
moveFile(localOutputDir + fileName);
} else if (fileName.endsWith(".chukwa")) {
long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
if (file.getModificationTime() < lastPeriod) {
//. chukwa file has not modified for some time, may indicate collector had previously crashed
log.info("Copying .chukwa file to valid sink file before moving, " + localOutputDir + fileName);
CopySequenceFile.createValidSequenceFile(conf,localOutputDir,fileName,localFs);
}
}
}
} catch (Exception e) {
log.warn("Cannot copy to the remote HDFS",e);
throw e;
}
}
@Override
public void run() {
boolean inError = true;
String filePath = null;
while (isRunning) {
try {
if (inError) {
init();
cleanup();
inError = false;
}
filePath = fileQueue.take();
if (filePath == null) {
continue;
}
moveFile(filePath);
cleanup();
filePath = null;
} catch (Throwable e) {
log.warn("Error in LocalToHdfsMover", e);
inError = true;
try {
log.info("Got an exception going to sleep for 60 secs");
Thread.sleep(60000);
} catch (Throwable e2) {
log.warn("Exception while sleeping", e2);
}
}
}
log.info(Thread.currentThread().getName() + " is exiting.");
}
public void shutdown() {
this.isRunning = false;
}
}