blob: 99ef2b8cdd084524b3f032daffc3057ac5ee0c0c [file] [log] [blame]
package org.apache.s4.wordcount;
import org.apache.s4.ft.EventGenerator;
import org.apache.s4.ft.KeyValue;
import org.apache.s4.ft.S4App;
import org.apache.s4.ft.S4TestCase;
import org.apache.s4.ft.TestUtils;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class WordCountTest extends S4TestCase {
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
public static final String SENTENCE_2 = "doobie doobie da";
public static final int SENTENCE_2_TOTAL_WORDS = SENTENCE_2.split(" ").length;
public static final String SENTENCE_3 = "doobie";
public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
public static final String FLAG = ";";
public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
+ SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
private static Factory zookeeperServerConnectionFactory;
@Before
public void prepare() throws IOException, InterruptedException, KeeperException {
TestUtils.cleanupTmpDirs();
zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
}
@Test
public void testSimple() throws Exception {
S4App app = new S4App(getClass(), "s4_core_conf.xml");
app.initializeS4App("app_conf.xml");
final ZooKeeper zk = TestUtils.createZkClient();
CountDownLatch signalTextProcessed = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
zk);
EventGenerator gen = new EventGenerator();
// add authorizations for processing
for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
+ 1; i++) {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
gen.injectValueEvent(new KeyValue("sentence", SENTENCE_1),
"Sentences", 0);
gen.injectValueEvent(new KeyValue("sentence", SENTENCE_2), "Sentences",
0);
gen.injectValueEvent(new KeyValue("sentence", SENTENCE_3), "Sentences",
0);
signalTextProcessed.await();
File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+ File.separator + "wordcount");
String s = TestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
}
@After
public void cleanup() throws IOException, InterruptedException {
TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
}
}