blob: 17cbee357ad3e062fa9796919b3eff4738b9b81a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pirk.responder.wideskies.storm;
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.LoggerFactory;
* Bolt to compute and output the final Response object for a query
* <p>
* Receives {@code <colIndex, colProduct>} tuples, computes the final column product for each colIndex, records the results in the final Response object, and
* outputs the final Response object for the query.
* <p>
* Flush signals are sent to the OuputBolt from the EncColMultBolts via a tuple of the form {@code <-1, 0>}. Once a flush signal has been received from each
* EncColMultBolt (or a timeout is reached), the final column product is computed and the final Response is formed and emitted.
* <p>
* Currently, the Responses are written to HDFS to location specified by the outputFile with the timestamp appended.
* <p>
* TODO: -- Enable other Response output locations
public class OutputBolt extends BaseRichBolt
private static final long serialVersionUID = 1L;
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class);
private OutputCollector outputCollector;
private Response response;
private String outputFile;
private boolean hdfs;
private Integer flushCounter = 0;
private List<Tuple> tuplesToAck = new ArrayList<>();
private Integer totalFlushSigs;
private LocalFileSystemStore localStore;
private HadoopFileSystemStore hadoopStore;
// This latch just serves as a hook for testing.
public static CountDownLatch latch = new CountDownLatch(4);
// This is the main object here. It holds column Id -> product
private Map<Long,BigInteger> resultsMap = new HashMap<>();
private BigInteger nSquared;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
outputCollector = collector;
totalFlushSigs = ((Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY)).intValue();
outputFile = (String) map.get(StormConstants.OUTPUT_FILE_KEY);
hdfs = (boolean) map.get(StormConstants.USE_HDFS);
if (hdfs)
String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
hadoopStore = new HadoopFileSystemStore(fs);
} catch (IOException e)
logger.error("Failed to initialize Hadoop file system for output.");
throw new RuntimeException(e);
localStore = new LocalFileSystemStore();
nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
response = new Response(queryInfo);"Intitialized OutputBolt.");
public void execute(Tuple tuple)
long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
BigInteger colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
// colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values.
// Could have created a new stream for such signals, but that seemed like overkill.
if (colIndex == -1)
logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs);
// Wait till all EncColMultBolts have been flushed
if (Objects.equals(flushCounter, totalFlushSigs))
{"TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size());
String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date()));
for (long cv : resultsMap.keySet())
response.addElement((int) cv, resultsMap.get(cv));
if (hdfs)
{ Path(outputFile + "_" + timestamp), response);
{ // In order to accommodate testing, this does not currently include timestamp.
// Should probably be fixed, but this will not likely be used outside of testing. File(outputFile), response);
for (long cv : resultsMap.keySet())
response.addElement((int) cv, resultsMap.get(cv));
logger.debug("column = " + cv + ", value = " + resultsMap.get(cv).toString());
} catch (IOException e)
logger.warn("Unable to write output file.");
// Reset
flushCounter = 0;
for (Tuple t : tuplesToAck)
// Used for integration test
// Process data values: add them to map. The column multiplication is only done in the case where saltColumns==true,
// in which case a small number of multiplications still need to be done per column.
if (resultsMap.containsKey(colIndex))
BigInteger colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
resultsMap.put(colIndex, colMult);
resultsMap.put(colIndex, colVal);
logger.debug("column = " + colIndex + ", value = " + resultsMap.get(colIndex).toString());
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)