blob: 6cb0dc1b302ef9c21126643321269e6deda536d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.extraction.archive;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
static final int ONE_HOUR = 60 * 60 * 1000;
static final int ONE_DAY = 24*ONE_HOUR;
static final int MAX_FILES = 500;
private static final int DEFAULT_MAX_ERROR_COUNT = 4;
protected ChukwaConfiguration conf = null;
protected FileSystem fs = null;
protected boolean isRunning = true;
public ChukwaArchiveManager() throws Exception {
conf = new ChukwaConfiguration();
init();
}
protected void init() throws IOException, URISyntaxException {
String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
fs = FileSystem.get(new URI(fsName), conf);
}
public static void main(String[] args) throws Exception {
ChukwaArchiveManager manager = new ChukwaArchiveManager();
manager.start();
}
public void shutdown() {
this.isRunning = false;
}
public void start() throws Exception {
String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
if ( ! chukwaRootDir.endsWith("/") ) {
chukwaRootDir += "/";
}
log.info("chukwaRootDir:" + chukwaRootDir);
String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
if ( ! archiveRootDir.endsWith("/") ) {
archiveRootDir += "/";
}
log.info("archiveDir:" + archiveRootDir);
Path pArchiveRootDir = new Path(archiveRootDir);
setup(pArchiveRootDir);
String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
// String archivesErrorDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME;
String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
DEFAULT_MAX_ERROR_COUNT);
Path pDailyRawArchivesInput = new Path(archiveRootDir);
Path pArchivesMRInputDir = new Path(archivesMRInputDir);
Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
Path pFinalArchiveOutput = new Path(finalArchiveOutput);
if (!archivesMRInputDir.endsWith("/")) {
archivesMRInputDir +="/";
}
setup( pArchivesRootProcessingDir );
setup( pDailyRawArchivesInput );
setup( pFinalArchiveOutput );
int errorCount = 0;
long lastRun = 0l;
while (isRunning) {
try {
if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
log.warn("==================\nToo many errors (" + errorCount +
"), Bail out!\n==================");
break;
}
// /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
// to
// /chukwa/archives/final/<YYYYMMDD>_<TS>
if (fs.exists(pArchivesMRInputDir)) {
FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
if (days.length > 0) {
log.info("reprocessing current Archive input" + days[0].getPath());
runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);
errorCount = 0;
continue;
}
}
log.info("Raw Archive dir:" + pDailyRawArchivesInput);
long now = System.currentTimeMillis();
int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
if (daysInRawArchiveDir.length == 0 ) {
log.debug( pDailyRawArchivesInput + " is empty, going to sleep for 1 minute");
Thread.sleep(1 * 60 * 1000);
continue;
}
// We don't want to process DataSink file more than once every 2 hours
// for current day
if (daysInRawArchiveDir.length == 1 ) {
int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);// 2h -1min
if (workingDay == currentDay && now < nextRun) {
log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
Thread.sleep(30 * 60 * 1000);
continue;
}
}
String dayArchivesMRInputDir = null;
for (FileStatus fsDay : daysInRawArchiveDir) {
dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
lastRun = now;
}
}catch (Throwable e) {
errorCount ++;
e.printStackTrace();
log.warn(e);
}
}
}
public void processDay(FileStatus fsDay, String archivesMRInputDir,
String archivesMROutputDir,String finalArchiveOutput) throws Exception {
FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
long now = System.currentTimeMillis();
int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
int workingDay = Integer.parseInt(fsDay.getPath().getName());
long oneHourAgo = now - ONE_HOUR;
if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
fs.delete(fsDay.getPath(),false);
log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
return;
}
int fileCount = 0;
for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
long modificationDate = fsDataSinkDir.getModificationTime();
if (modificationDate < oneHourAgo || workingDay < currentDay) {
log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
// process no more than MAX_FILES directories
if (fileCount >= MAX_FILES) {
log.info("processDay, reach capacity");
runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);
fileCount = 0;
} else {
log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
}
}
}
}
public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
String finalArchiveOutput) throws Exception {
String[] args = new String[3];
args[0] = conf.get("archive.grouper","Stream");
args[1] = archivesMRInputDir + "*/*.done" ;
args[2] = archivesMROutputDir;
Path pArchivesMRInputDir = new Path(archivesMRInputDir);
Path pArchivesMROutputDir = new Path(archivesMROutputDir);
if (fs.exists(pArchivesMROutputDir)) {
log.warn("Deleteing mroutput dir for archive ...");
fs.delete(pArchivesMROutputDir, true);
}
log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
log.info("Archive result: " + res);
if (res != 0) {
throw new Exception("Archive result != 0");
}
if (!finalArchiveOutput.endsWith("/")) {
finalArchiveOutput +="/";
}
String day = pArchivesMRInputDir.getName();
finalArchiveOutput += day;
Path pDay = new Path(finalArchiveOutput);
setup(pDay);
finalArchiveOutput += "/archive_" + System.currentTimeMillis();
Path pFinalArchiveOutput = new Path(finalArchiveOutput);
log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
log.info("deleting " + pArchivesMRInputDir);
fs.delete(pArchivesMRInputDir, true);
} else {
log.warn("move to final archive folder failed!");
}
}
public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
String archivesMRInputDir) throws IOException {
if (!archivesMRInputDir.endsWith("/")) {
archivesMRInputDir +="/";
}
Path pArchivesMRInputDir = new Path(archivesMRInputDir);
setup(pArchivesMRInputDir);
fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
}
/**
* Create directory if !exists
* @param directory
* @throws IOException
*/
protected void setup(Path directory) throws IOException {
if ( ! fs.exists(directory)) {
fs.mkdirs(directory);
}
}
}