blob: dc867d54e8b9e80b90181d17921f263700ab7fb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
* A base class for file tailing adaptors.
* Intended to mandate as little policy as possible, and to use as
* few system resources as possible.
*
*
* If the file does not exist, this class will continue to retry quietly
* forever and will start tailing if it's eventually created.
*/
public class LWFTAdaptor extends AbstractAdaptor {
/**
* This is the maximum amount we'll read from any one file before moving on to
* the next. This way, we get quick response time for other files if one file
* is growing rapidly.
*
*/
public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
public static final String MAX_READ_SIZE_OPT =
"chukwaAgent.fileTailingAdaptor.maxReadSize";
int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
static Logger log;
static FileTailer tailer;
static {
tailer = null;
log = Logger.getLogger(FileTailingAdaptor.class);
}
/**
* next PHYSICAL offset to read
*/
protected long fileReadOffset;
/**
* The logical offset of the first byte of the file
*/
protected long offsetOfFirstByte = 0;
protected Configuration conf = null;
/**
* The timestamp of last slurping.
*/
protected long lastSlurpTime = 0l;
File toWatch;
@Override
public void start(long offset) {
synchronized(LWFTAdaptor.class) {
if(tailer == null)
tailer = new FileTailer(control.getConfiguration());
}
this.fileReadOffset = offset - offsetOfFirstByte;
tailer.startWatchingFile(this);
}
/**
* @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
*/
public String getCurrentStatus() {
return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
}
public String toString() {
return "Lightweight Tailer on " + toWatch;
}
public String getStreamName() {
return toWatch.getPath();
}
@Override
public String parseArgs(String params) {
conf = control.getConfiguration();
MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
Matcher m = cmd.matcher(params);
if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
offsetOfFirstByte = Long.parseLong(m.group(1));
toWatch = new File(m.group(2));
} else {
toWatch = new File(params.trim());
}
return toWatch.getAbsolutePath();
}
@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
throws AdaptorException {
tailer.stopWatchingFile(this);
return fileReadOffset + offsetOfFirstByte;
}
/**
* Extract records from a byte sequence
*
* @param eq the queue to stick the new chunk[s] in
* @param buffOffsetInFile the byte offset in the stream at which buf[] begins
* @param buf the byte buffer to extract records from
* @return the number of bytes processed
* @throws InterruptedException
*/
protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
byte[] buf) throws InterruptedException {
if(buf.length == 0)
return 0;
ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
buffOffsetInFile + buf.length, buf, this);
eq.add(chunk);
return buf.length;
}
protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
InterruptedException{
boolean hasMoreData = false;
log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
reader.seek(fileReadOffset);
long bufSize = len - fileReadOffset;
if (bufSize > MAX_READ_SIZE) {
bufSize = MAX_READ_SIZE;
hasMoreData = true;
}
byte[] buf = new byte[(int) bufSize];
long curOffset = fileReadOffset;
lastSlurpTime = System.currentTimeMillis();
int bufferRead = reader.read(buf);
assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+ " pointer is "
+ reader.getFilePointer()
+ " but offset is "
+ fileReadOffset + bufSize;
int bytesUsed = extractRecords(dest,
fileReadOffset + offsetOfFirstByte, buf);
// === WARNING ===
// If we couldn't found a complete record AND
// we cannot read more, i.e bufferRead == MAX_READ_SIZE
// it's because the record is too BIG
// So log.warn, and drop current buffer so we can keep moving
// instead of being stopped at that point for ever
if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="
+ curOffset
+ ", MAX_READ_SIZE="
+ MAX_READ_SIZE
+ ", for "
+ toWatch.getPath());
bytesUsed = buf.length;
}
fileReadOffset = fileReadOffset + bytesUsed;
log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
+ fileReadOffset);
return hasMoreData;
}
public boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
//if file doesn't exist, length =0 and we just keep waiting for it.
//if(!toWatch.exists())
// deregisterAndStop(false);
long len = toWatch.length();
if(len < fileReadOffset) {
//file shrank; probably some data went missing.
handleShrunkenFile(len);
} else if(len > fileReadOffset) {
RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
hasMoreData = slurp(len, reader);
reader.close();
}
} catch(IOException e) {
log.warn("IOException in tailer", e);
deregisterAndStop();
}
return hasMoreData;
}
private void handleShrunkenFile(long measuredLen) {
log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
offsetOfFirstByte = measuredLen;
fileReadOffset = 0;
}
}