blob: 85a016e77f56ff163c3329cf0c007225aa4a29f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bigtop.itest.flumesmoke
import org.junit.Test
import org.junit.Ignore
import org.apache.bigtop.itest.shell.Shell
import org.junit.AfterClass
import org.junit.BeforeClass
import static junit.framework.Assert.assertEquals
import org.apache.hadoop.conf.Configuration
import org.apache.bigtop.itest.JarContent
// TODO: we have to stub it for 0.20.2 release, once we move to 0.21+ this can go
// import org.apache.hadoop.hdfs.DFSConfigKeys
class DFSConfigKeys {
static public final FS_DEFAULT_NAME_KEY = "fs.default.name";
}
class TestFlumeSmoke {
private static String tmp = "TestFlumeSmoke-${(new Date().getTime())}";
private static String nn = (new Configuration()).get(DFSConfigKeys.FS_DEFAULT_NAME_KEY);
private static String hdfs_sink_dir = "${nn}/user/${System.properties['user.name']}/$tmp";
private static Shell sh = new Shell('/bin/bash -s');
@BeforeClass
static void setUp() {
JarContent.unpackJarContainer(TestFlumeSmoke.class, '.', null);
}
@AfterClass
static void tearDown() {
sh.exec("hadoop fs -rmr $hdfs_sink_dir");
}
private void compressionCommonTest(String id, String decompress, String glob) {
String node_config = "node:text(\"events.txt\")|collectorSink(\"${hdfs_sink_dir}\",\"data\");";
sh.exec("export FLUME_CONF_DIR=./${id}",
"flume node_nowatch -s -1 -n node -c '${node_config}'");
assertEquals("Flume failed to accept events",
0, sh.ret);
sh.exec("hadoop fs -cat ${hdfs_sink_dir}/${glob} | ${decompress} | wc -l");
assertEquals("Wrong # of lines in output found at ${hdfs_sink_dir}",
"10000", sh.out[0]);
}
@Test(timeout = 300000L)
public void testBzip2() {
compressionCommonTest("FlumeSmokeBzip2", "bzip2 -d", "*.bz2");
}
@Ignore("BIGTOP-218")
@Test(timeout = 300000L)
public void testDeflate() {
compressionCommonTest("FlumeSmokeDeflate", "perl -MCompress::Zlib -e 'undef \$/; print uncompress(<>)'", "*.deflate");
}
@Test(timeout = 300000L)
public void testGzip() {
compressionCommonTest("FlumeSmokeGzip", "gzip -d", "*.gz");
}
}