blob: 5f4d35fd123636e544504daf51dd563d35da0e09 [file] [log] [blame]
package org.apache.s4.ft.pecache;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.s4.ft.KeyValue;
import org.apache.s4.ft.TestUtils;
import org.apache.s4.processor.AbstractPE;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class CacheTestPE extends AbstractPE implements Watcher {
String value = "";
transient ZooKeeper zk = null;
public void processEvent(KeyValue event) {
if (zk == null) {
Logger.getLogger(getClass()).info("Creating ZK connection");
try {
zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if ("key".equals(event.getKey())) {
setValue(this.value + event.getValue());
try {
Logger.getLogger(getClass()).info("setting ZK /value");
zk.create("/value", value.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("unknown event " + event);
}
}
public void setValue(String value) {
this.value = value;
}
@Override
public void output() {
// TODO Auto-generated method stub
}
@Override
public void process(WatchedEvent arg0) {
// TODO Auto-generated method stub
}
}