package org.apache.eagle.persist.test;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.typesafe.config.Config;
import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
import org.apache.eagle.datastream.ExecutionEnvironments;
import org.apache.eagle.datastream.core.StorageType;
import org.apache.eagle.datastream.core.StreamProducer;
import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.apache.eagle.partition.PartitionStrategy;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
* Created on 1/4/16.
* This test demonstrates how user could use the new aggregate and persist feature for case like metrics processing&storage.
public class PersistTopoTestMain {
public static void main(String[] args) {
// System.setProperty("config.resource", "application.conf");
StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
StormSpoutProvider provider = createProvider(env.getConfig());
execWithDefaultPartition(env, provider);
public static void execWithDefaultPartition(StormExecutionEnvironment env, StormSpoutProvider provider) {
StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer");
StreamProducer filter = source;
// required : persistTestEventStream schema be created in metadata manager
// required : policy for aggregateExecutor1 be created in metadata manager
StreamProducer aggregate = filter.aggregate(Arrays.asList("persistTestEventStream"), "aggregateExecutor1", new PartitionStrategy() {
public int balance(String key, int buckNum) {
return 0;
StreamProducer persist = aggregate.persist("persistExecutor1", StorageType.KAFKA());
public static StormSpoutProvider createProvider(Config config) {
return new StormSpoutProvider(){
public BaseRichSpout getSpout(Config context) {
return new StaticMetricSpout();
public static class StaticMetricSpout extends BaseRichSpout {
private long base;
private SpoutOutputCollector collector;
public StaticMetricSpout() {
base = System.currentTimeMillis();
private Random cpuRandom = new Random();
private Random memRandom = new Random();
private static final long FULL_MEM_SIZE_BYTES = 512 * 1024 * 1024 * 1024;// 16g memory upbound limit
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("timestamp", "host", "cpu", "mem"));
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
public void nextTuple() {
base = base + 100;// with fix steps..
long mem = Double.valueOf(memRandom.nextGaussian() * FULL_MEM_SIZE_BYTES).longValue();
collector.emit(new Values(base, "host", cpuRandom.nextInt(100), mem));