blob: 1453c3e5bdf7ab25f2acca1200bf28af3fe708f6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.eagle.gc;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
import storm.kafka.StringScheme;
/**
* Since 8/12/16.
*/
public class GCLogApplication extends StormApplication{
public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
public final static String ANALYZER_TASK_NUM = "topology.numOfAnalyzerTasks";
public final static String GENERATOR_TASK_NUM = "topology.numOfGeneratorTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfAnalyzerTasks = config.getInt(ANALYZER_TASK_NUM);
int numOfGeneratorTasks = config.getInt(GENERATOR_TASK_NUM);
int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
builder.setSpout("ingest", spout, numOfSpoutTasks);
GCLogAnalyzerBolt bolt = new GCLogAnalyzerBolt();
BoltDeclarer boltDeclarer = builder.setBolt("analyzerBolt", bolt, numOfAnalyzerTasks);
boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
GCMetricGeneratorBolt generatorBolt = new GCMetricGeneratorBolt(config);
BoltDeclarer joinBoltDeclarer = builder.setBolt("generatorBolt", generatorBolt, numOfGeneratorTasks);
joinBoltDeclarer.fieldsGrouping("analyzerBolt", new Fields("f1"));
StormStreamSink sinkBolt = environment.getStreamSink("gc_log_stream",config);
BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
kafkaBoltDeclarer.fieldsGrouping("generatorBolt", new Fields("f1"));
return builder.createTopology();
}
public static void main(String[] args){
System.setProperty("config.resource", "/application-gclog.conf");
Config config = ConfigFactory.load();
GCLogApplication app = new GCLogApplication();
app.run(config);
}
}