blob: 4bfae2d72220c89a88cfd986f50b6b69d8322b11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.io.File;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
class FileAdaptorTailer extends Thread {
static Logger log = Logger.getLogger(FileAdaptorTailer.class);
private List<FileAdaptor> adaptors = null;
/**
* How often to call each adaptor.
*/
int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 10;
int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
public FileAdaptorTailer() {
ChukwaAgent agent = ChukwaAgent.getAgent();
if (agent != null) {
Configuration conf = agent.getConfiguration();
if (conf != null) {
SAMPLE_PERIOD_MS = conf.getInt(
"chukwaAgent.adaptor.context.switch.time",
DEFAULT_SAMPLE_PERIOD_MS);
}
}
// iterations are much more common than adding a new adaptor
adaptors = new CopyOnWriteArrayList<FileAdaptor>();
setDaemon(true);
start();// start the FileAdaptorTailer thread
}
@Override
public void run() {
while(true) {
try {
long startTime = System.currentTimeMillis();
for (FileAdaptor adaptor: adaptors) {
log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
adaptor.sendFile();
}
long timeToReadFiles = System.currentTimeMillis() - startTime;
if (timeToReadFiles < SAMPLE_PERIOD_MS) {
Thread.sleep(SAMPLE_PERIOD_MS);
}
}catch (Throwable e) {
log.warn("Exception in FileAdaptorTailer:",e);
}
}
}
public void addFileAdaptor(FileAdaptor adaptor) {
adaptors.add(adaptor);
}
public void removeFileAdaptor(FileAdaptor adaptor) {
adaptors.remove(adaptor);
}
}
/**
* File Adaptor push small size file in one chunk to collector
*/
public class FileAdaptor extends AbstractAdaptor {
static Logger log = Logger.getLogger(FileAdaptor.class);
static FileAdaptorTailer tailer = null;
static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
static {
tailer = new FileAdaptorTailer();
}
private long startTime = 0;
private long timeOut = 0;
protected volatile boolean finished = false;
File toWatch;
protected RandomAccessFile reader = null;
protected long fileReadOffset;
protected boolean deleteFileOnClose = false;
protected boolean shutdownCalled = false;
/**
* The logical offset of the first byte of the file
*/
private long offsetOfFirstByte = 0;
public void start(long bytes) {
// in this case params = filename
log.info("adaptor id: " + adaptorID + " started file adaptor on file "
+ toWatch);
this.startTime = System.currentTimeMillis();
TIMEOUT_PERIOD = control.getConfiguration().getInt(
"chukwaAgent.adaptor.fileadaptor.timeoutperiod",
DEFAULT_TIMEOUT_PERIOD);
this.timeOut = startTime + TIMEOUT_PERIOD;
tailer.addFileAdaptor(this);
}
void sendFile() {
long now = System.currentTimeMillis() ;
long oneMinAgo = now - (60*1000);
if (toWatch.exists()) {
if (toWatch.lastModified() > oneMinAgo && now < timeOut) {
log.info("Last modified time less than one minute, keep waiting");
return;
} else {
try {
long bufSize = toWatch.length();
byte[] buf = new byte[(int) bufSize];
reader = new RandomAccessFile(toWatch, "r");
reader.read(buf);
reader.close();
reader = null;
long fileTime = toWatch.lastModified();
int bytesUsed = extractRecords(dest, 0, buf, fileTime);
this.fileReadOffset = bytesUsed;
finished = true;
deregisterAndStop();
cleanUp();
} catch(Exception e) {
log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
reader = null;
}
}
}
} else {
if (now > timeOut) {
finished = true;
log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
deregisterAndStop();
cleanUp() ;
}
}
}
private void cleanUp() {
tailer.removeFileAdaptor(this);
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
reader = null;
}
}
@Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
switch(shutdownPolicy) {
case GRACEFULLY : {
int retry = 0;
while (!finished && retry < 60) {
try {
log.info("GRACEFULLY Retry:" + retry);
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
break;
case WAIT_TILL_FINISHED : {
int retry = 0;
while (!finished) {
try {
if (retry%100 == 0) {
log.info("WAIT_TILL_FINISHED Retry:" + retry);
}
Thread.sleep(1000);
retry++;
} catch (InterruptedException ex) {
}
}
}
break;
default :
cleanUp();
break;
}
if (deleteFileOnClose && toWatch != null) {
if (log.isDebugEnabled()) {
log.debug("About to delete " + toWatch.getAbsolutePath());
}
if (toWatch.delete()) {
if (log.isInfoEnabled()) {
log.debug("Successfully deleted " + toWatch.getAbsolutePath());
}
} else {
if (log.isEnabledFor(Level.WARN)) {
log.warn("Could not delete " + toWatch.getAbsolutePath() + " (for unknown reason)");
}
}
}
log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return fileReadOffset + offsetOfFirstByte;
}
public String parseArgs(String params) {
String[] words = params.split(" ");
if (words.length == 2) {
if (words[1].equals("delete")) {
deleteFileOnClose = true;
toWatch = new File(words[0]);
} else {
offsetOfFirstByte = Long.parseLong(words[0]);
toWatch = new File(words[1]);
}
} else if (words.length == 3) {
offsetOfFirstByte = Long.parseLong(words[0]);
toWatch = new File(words[1]);
deleteFileOnClose = words[2].equals("delete");
} else {
toWatch = new File(params);
}
return toWatch.getAbsolutePath();
}
/**
* Extract records from a byte sequence
*
* @param eq
* the queue to stick the new chunk[s] in
* @param buffOffsetInFile
* the byte offset in the stream at which buf[] begins
* @param buf
* the byte buffer to extract records from
* @return the number of bytes processed
* @throws InterruptedException
*/
protected int extractRecords(final ChunkReceiver eq, long buffOffsetInFile,
byte[] buf, long fileTime) throws InterruptedException {
final ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
buffOffsetInFile + buf.length, buf, this);
chunk.addTag("time=\"" + fileTime + "\"");
log.info("Adding " + toWatch.getAbsolutePath() + " to the queue");
eq.add(chunk);
log.info( toWatch.getAbsolutePath() + " added to the queue");
return buf.length;
}
@Override
public String getCurrentStatus() {
return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
}
}