blob: e0f83d317e43c85e10fb03999b1fe426674ddf99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pirk.responder.wideskies.storm;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.LoggerFactory;
/**
* Storm topology class for wideskies Pirk implementation
* <p>
*
*/
public class PirkTopology
{
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkTopology.class);
private static final String kafkaClientId = SystemConfiguration.getProperty("kafka.clientId", "KafkaSpout");
private static final String brokerZk = SystemConfiguration.getProperty("kafka.zk", "localhost:2181");
private static final String kafkaTopic = SystemConfiguration.getProperty("kafka.topic", "pirkTopic");
private static final Boolean forceFromStart = Boolean.parseBoolean(SystemConfiguration.getProperty("kafka.forceFromStart", "false"));
private static final Boolean useHdfs = Boolean.parseBoolean(SystemConfiguration.getProperty("hdfs.use", "true"));
private static final String hdfsUri = SystemConfiguration.getProperty("hdfs.uri", "localhost");
private static final String topologyName = SystemConfiguration.getProperty("storm.topoName", "PirkTopology");
private static final Integer numWorkers = Integer.parseInt(SystemConfiguration.getProperty("storm.workers", "1"));
private static final Integer spoutParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.spout.parallelism", "1"));
private static final Integer partitionDataBoltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.partitiondata.parallelism", "1"));
private static final Integer encrowcalcboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.parallelism", "1"));
private static final Integer enccolmultboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.enccolmultbolt.parallelism", "1"));
private static final Boolean saltColumns = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.saltColumns", "false"));
private static final Boolean splitPartitions = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.splitPartitions", "false"));
private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput");
private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile");
public static void runPirkTopology() throws Exception
{
// Set up Kafka parameters
logger.info("Configuring Kafka.");
String zkRoot = "/" + kafkaTopic + "_pirk_storm";
BrokerHosts zkHosts = new ZkHosts(brokerZk);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, kafkaTopic, zkRoot, kafkaClientId);
kafkaConfig.ignoreZkOffsets = forceFromStart;
// Create conf
logger.info("Retrieving Query and generating Storm conf.");
Config conf = createStormConf();
Query query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
conf.put(StormConstants.N_SQUARED_KEY, query.getNSquared().toString());
conf.put(StormConstants.QUERY_INFO_KEY, query.getQueryInfo().toMap());
// Configure this for different types of input data on Kafka.
kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
// Create topology
StormTopology topology = getPirkTopology(kafkaConfig);
// Run topology
logger.info("Submitting Pirk topology to Storm...");
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
} // main
/***
* Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
*
* @param kafkaConfig
* @return
*/
public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
{
// Create spout and bolts
KafkaSpout spout = new KafkaSpout(kafkaConfig);
PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
EncRowCalcBolt ercbolt = new EncRowCalcBolt();
EncColMultBolt ecmbolt = new EncColMultBolt();
OutputBolt outputBolt = new OutputBolt();
// Build Storm topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);
builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
new Fields(StormConstants.HASH_FIELD));
// TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3).
BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism)
.fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD))
.allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END)
.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));
// b2.setMemoryLoad(5000);
// b2.setCPULoad(150.0);
BoltDeclarer b3 = builder
.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
.fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
.allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
// b3.setMemoryLoad(5000);
// b3.setCPULoad(500.0);
builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID);
return builder.createTopology();
}
public static Config createStormConf()
{
Boolean limitHitsPerSelector = Boolean.parseBoolean(SystemConfiguration.getProperty("pir.limitHitsPerSelector"));
Integer maxHitsPerSelector = Integer.parseInt(SystemConfiguration.getProperty("pir.maxHitsPerSelector"));
Integer rowDivisions = Integer.parseInt(SystemConfiguration.getProperty("storm.rowDivs", "1"));
Config conf = new Config();
conf.setNumAckers(Integer.parseInt(SystemConfiguration.getProperty("storm.numAckers", numWorkers.toString())));
conf.setMaxSpoutPending(Integer.parseInt(SystemConfiguration.getProperty("storm.maxSpoutPending", "300")));
conf.setNumWorkers(numWorkers);
conf.setDebug(false);
// conf.setNumEventLoggers(2);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.receiveBufferSize", 1024));
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.sendBufferSize", 1024));
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.transferBufferSize", 32));
conf.put(Config.WORKER_HEAP_MEMORY_MB, SystemConfiguration.getIntProperty("storm.worker.heapMemory", 750));
conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
// Pirk parameters to send to bolts
conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true"));
conf.put(StormConstants.QSCHEMA_KEY, SystemConfiguration.getProperty("query.schemas"));
conf.put(StormConstants.DSCHEMA_KEY, SystemConfiguration.getProperty("data.schemas"));
conf.put(StormConstants.HDFS_URI_KEY, hdfsUri);
conf.put(StormConstants.QUERY_FILE_KEY, queryFile);
conf.put(StormConstants.USE_HDFS, useHdfs);
conf.put(StormConstants.OUTPUT_FILE_KEY, outputPath);
conf.put(StormConstants.LIMIT_HITS_PER_SEL_KEY, limitHitsPerSelector);
conf.put(StormConstants.MAX_HITS_PER_SEL_KEY, maxHitsPerSelector);
conf.put(StormConstants.SPLIT_PARTITIONS_KEY, splitPartitions);
conf.put(StormConstants.SALT_COLUMNS_KEY, saltColumns);
conf.put(StormConstants.ROW_DIVISIONS_KEY, rowDivisions);
conf.put(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY, encrowcalcboltParallelism);
conf.put(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY, enccolmultboltParallelism);
return conf;
}
}