blob: ff1e60b5352cb06ae3ed91956f23b4292214f2c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.File;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
/**
* An adaptor that repeatedly tails a specified file, sending the new bytes.
* This class does not split out records, but just sends everything up to end of
* file. Subclasses can alter this behavior by overriding extractRecords().
*
*/
public class FileTailingAdaptor extends LWFTAdaptor {
public static int MAX_RETRIES = 300;
static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
private int attempts = 0;
private long gracefulPeriodExpired = 0l;
private boolean adaptorInError = false;
protected RandomAccessFile reader = null;
public void start(long bytes) {
super.start(bytes);
log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
this.attempts = 0;
log.info("started file tailer " + adaptorID + " on file " + toWatch
+ " with first byte at offset " + offsetOfFirstByte);
}
@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
switch(shutdownPolicy) {
case GRACEFULLY :
case WAIT_TILL_FINISHED :{
if (toWatch.exists()) {
int retry = 0;
tailer.stopWatchingFile(this);
TerminatorThread lastTail = new TerminatorThread(this);
lastTail.setDaemon(true);
lastTail.start();
if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
while (lastTail.isAlive() && retry < 60) {
try {
log.info("GRACEFULLY Retry:" + retry);
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
} else {
while (lastTail.isAlive()) {
try {
if (retry%100 == 0) {
log.info("WAIT_TILL_FINISHED Retry:" + retry);
}
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
}
}
break;
case HARD_STOP:
default:
tailer.stopWatchingFile(this);
try {
if (reader != null) {
reader.close();
}
reader = null;
} catch(Throwable e) {
log.warn("Exception while closing reader:",e);
}
break;
}
log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return fileReadOffset + offsetOfFirstByte;
}
/**
* Looks at the tail of the associated file, adds some of it to event queue
* This method is not thread safe. Returns true if there's more data in the
* file
*/
@Override
public boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
if ((adaptorInError == true)
&& (System.currentTimeMillis() > gracefulPeriodExpired)) {
if (!toWatch.exists()) {
log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+ "| File does not exist: " + toWatch.getAbsolutePath()
+ ", streaming policy expired. File removed from streaming.");
} else if (!toWatch.canRead()) {
log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+ "| File cannot be read: " + toWatch.getAbsolutePath()
+ ", streaming policy expired. File removed from streaming.");
} else {
// Should have never been there
adaptorInError = false;
gracefulPeriodExpired = 0L;
attempts = 0;
return false;
}
deregisterAndStop();
return false;
} else if (!toWatch.exists() || !toWatch.canRead()) {
if (adaptorInError == false) {
long now = System.currentTimeMillis();
gracefulPeriodExpired = now + GRACEFUL_PERIOD;
adaptorInError = true;
attempts = 0;
log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
+ ", graceful period will Expire at now:" + now + " + "
+ GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
} else if (attempts % 10 == 0) {
log.info("failed to stream data for: " + toWatch.getAbsolutePath()
+ ", attempt: " + attempts);
}
attempts++;
return false; // no more data
}
if (reader == null) {
reader = new RandomAccessFile(toWatch, "r");
log.info("Adaptor|" + adaptorID
+ "|Opening the file for the first time|seek|" + fileReadOffset);
}
long len = 0L;
try {
len = reader.length();
if (lastSlurpTime == 0) {
lastSlurpTime = System.currentTimeMillis();
}
if (offsetOfFirstByte > fileReadOffset) {
// If the file rotated, the recorded offsetOfFirstByte is greater than
// file size,reset the first byte position to beginning of the file.
fileReadOffset = 0;
offsetOfFirstByte = 0L;
log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
}
if (len == fileReadOffset) {
File fixedNameFile = new File(toWatch.getAbsolutePath());
long fixedNameLastModified = fixedNameFile.lastModified();
if (fixedNameLastModified > lastSlurpTime) {
// If len == fileReadOffset,the file stops rolling log or the file
// has rotated.
// But fixedNameLastModified > lastSlurpTime , this means after the
// last slurping,the file has been written ,
// so the file has been rotated.
boolean hasLeftData = true;
while (hasLeftData) {// read the possiblly generated log
hasLeftData = slurp(len, reader);
}
RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
if (reader != null) {
reader.close();
}
reader = newReader;
fileReadOffset = 0L;
len = reader.length();
log.debug("Adaptor|" + adaptorID
+ "| File size mismatched, rotating: "
+ toWatch.getAbsolutePath());
hasMoreData = slurp(len, reader);
}
} else if (len < fileReadOffset) {
// file has rotated and no detection
if (reader != null) {
reader.close();
}
reader = null;
fileReadOffset = 0L;
offsetOfFirstByte = 0L;
hasMoreData = true;
log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
+ ", has rotated and no detection - reset counters to 0L");
} else {
hasMoreData = slurp(len, reader);
}
} catch (IOException e) {
// do nothing, if file doesn't exist.
}
} catch (IOException e) {
log.warn("failure reading " + toWatch, e);
}
attempts = 0;
adaptorInError = false;
return hasMoreData;
}
}