blob: e12853d77f626a333cfb7c97ac54d92a94dc4b47 [file] [log] [blame]
package org.apache.s4.wordcount;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.s4.core.App;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class WordCountTest {
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
public static final String SENTENCE_2 = "doobie doobie da";
public static final int SENTENCE_2_TOTAL_WORDS = SENTENCE_2.split(" ").length;
public static final String SENTENCE_3 = "doobie";
public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
public static final String FLAG = ";";
public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
private static Factory zookeeperServerConnectionFactory;
@Before
public void prepare() throws IOException, InterruptedException, KeeperException {
CommTestUtils.cleanupTmpDirs();
zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
}
/**
* A simple word count application:
*
*
*
*
* sentences words word counts Adapter ------------> WordSplitterPE -----------> WordCounterPE ------------->
* WordClassifierPE key = "sentence" key = word key="classifier" (should be *)
*
*
* The test consists in checking that words are correctly counted.
*
*
*/
@Test
public void testSimple() throws Exception {
final ZooKeeper zk = CommTestUtils.createZkClient();
App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
// add authorizations for processing
for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
signalTextProcessed.await();
File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
}
@After
public void cleanup() throws IOException, InterruptedException {
CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
}
}