blob: f72c06d740d8b1941bf37d3ae316d9cacf74ecb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.conf.*;
import org.apache.log4j.Level;
import junit.framework.TestCase;
public class TestDirTailingAdaptor extends TestCase {
ChukwaAgent agent;
File baseDir;
static final int SCAN_INTERVAL = 1000;
/**
* This test is exactly the same as testDirTailed except that it applies filtering and<br/>
* creates a file that should not be read inorder to test the filter.
* @throws IOException
* @throws ChukwaAgent.AlreadyRunningException
* @throws InterruptedException
*/
public void testDirTailerFiltering() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
DirTailingAdaptor.log.setLevel(Level.DEBUG);
Configuration conf = new Configuration();
baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
createEmptyDir(checkpointDir);
conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
conf.setInt("chukwaAgent.control.port", 0);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
agent = ChukwaAgent.getAgent(conf);
agent.start();
File emptyDir = new File(baseDir, "emptyDir2");
createEmptyDir(emptyDir);
assertEquals(0, agent.adaptorCount());
//check filtering with empty directory
agent.processAddCommand("add emptydir2= DirTailingAdaptor raw " + emptyDir + " *file filetailer.CharFileTailingAdaptorUTF8 0");
assertEquals(1, agent.adaptorCount());
File dirWithFile = new File(baseDir, "dir3");
dirWithFile.delete();
assertFalse("temp directory not empty",dirWithFile.exists());
//this file should be found by the filter
dirWithFile.mkdir();
File inDir = File.createTempFile("atemp", "file", dirWithFile);
inDir.deleteOnExit();
//This file should not be found by the filter
File noreadFile = File.createTempFile("atemp", "noread", dirWithFile);
noreadFile.deleteOnExit();
//apply filter *file
agent.processAddCommand("add dir3= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
Thread.sleep(3000);
assertEquals(3, agent.adaptorCount());
agent.shutdown();
conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
File aNewFile = File.createTempFile("new", "file", dirWithFile);
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
agent = ChukwaAgent.getAgent(conf); //restart agent.
agent.start();
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
//make sure we started tailing the new, not the old, file.
for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
System.out.println(adaptors.getKey() +": " + adaptors.getValue());
assertFalse(adaptors.getValue().contains("oldXYZ"));
}
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
//should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
//and FileTailers for File inDir and file newfile and not the noread file.
assertEquals(4, agent.adaptorCount());
agent.shutdown();
Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
nukeDirContents(checkpointDir);//nuke dir
checkpointDir.delete();
emptyDir.delete();
nukeDirContents(dirWithFile);
dirWithFile.delete();
}
public void testDirTailer() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
DirTailingAdaptor.log.setLevel(Level.DEBUG);
Configuration conf = new Configuration();
baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
createEmptyDir(checkpointDir);
conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
conf.setInt("chukwaAgent.control.port", 0);
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
boolean retry = true;
while(retry) {
try {
retry = false;
agent = ChukwaAgent.getAgent(conf);
agent.start();
} catch(Exception e) {
retry = true;
}
}
File emptyDir = new File(baseDir, "emptyDir");
createEmptyDir(emptyDir);
assertEquals(0, agent.adaptorCount());
agent.processAddCommand("add emptydir= DirTailingAdaptor raw " + emptyDir + " filetailer.CharFileTailingAdaptorUTF8 0");
assertEquals(1, agent.adaptorCount());
File dirWithFile = new File(baseDir, "dir2");
dirWithFile.delete();
assertFalse("temp directory not empty",dirWithFile.exists());
dirWithFile.mkdir();
File inDir = File.createTempFile("atemp", "file", dirWithFile);
inDir.deleteOnExit();
agent.processAddCommand("add dir2= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
Thread.sleep(3000);
assertEquals(3, agent.adaptorCount());
System.out.println("DirTailingAdaptor looks OK before restart");
agent.shutdown();
conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
File aNewFile = File.createTempFile("new", "file", dirWithFile);
anOldFile.deleteOnExit();
aNewFile.deleteOnExit();
anOldFile.setLastModified(10);//just after epoch
agent = ChukwaAgent.getAgent(conf); //restart agent.
agent.start();
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertTrue(aNewFile.exists());
//make sure we started tailing the new, not the old, file.
for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
System.out.println(adaptors.getKey() +": " + adaptors.getValue());
assertFalse(adaptors.getValue().contains("oldXYZ"));
}
//should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
//and FileTailers for File inDir and file newfile
Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
assertEquals(4, agent.adaptorCount());
agent.shutdown();
nukeDirContents(checkpointDir);//nuke dir
checkpointDir.delete();
emptyDir.delete();
nukeDirContents(dirWithFile);
dirWithFile.delete();
}
//returns true if dir exists
public static boolean nukeDirContents(File dir) {
if(dir.exists()) {
if(dir.isDirectory()) {
for(File f: dir.listFiles()) {
nukeDirContents(f);
f.delete();
}
} else
dir.delete();
return true;
}
return false;
}
public static void createEmptyDir(File dir) {
if(!nukeDirContents(dir))
dir.mkdir();
assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
}
}