blob: 3c8fe1af34ffb3030b3cf4e5857e83e4c10146d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pirk.responder.wideskies.storm;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
* Bolt class to perform the encrypted row calculation
* <p>
* Receives a {@code <hash(selector), dataPartitions>} tuple as input.
* <p>
* Encrypts the row data and emits a (column index, encrypted row-value) tuple for each encrypted block.
* <p>
* Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush its output and resets all counters. At that point, all outgoing (column index,
* encrypted row-value) tuples are buffered until a SESSION_END signal is received back from each EncColMultBolt.
*/
public class EncRowCalcBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncRowCalcBolt.class);
private OutputCollector outputCollector;
private static Query query;
private static boolean querySet = false;
private Boolean limitHitsPerSelector;
private Long maxHitsPerSelector;
private Long totalEndSigs;
private int rowDivisions;
private Boolean saltColumns;
private Boolean splitPartitions;
private Random rand;
// These are the main data structures used here.
private Map<Integer,Integer> hitsByRow = new HashMap<>();
private Map<Integer,Integer> colIndexByRow = new HashMap<>();
private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
private List<BigInteger> dataArray = new ArrayList<>();
private int numEndSigs = 0;
// These buffered values are used in the case when a session has been ejected, but the SESSION_END signal has not been received
// yet from the next bolt.
private boolean buffering = false;
private List<Tuple2<Long,BigInteger>> bufferedValues = new ArrayList<>();
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector coll)
{
outputCollector = coll;
setQuery(map);
logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
maxHitsPerSelector = (Long) map.get(StormConstants.MAX_HITS_PER_SEL_KEY);
limitHitsPerSelector = (Boolean) map.get(StormConstants.LIMIT_HITS_PER_SEL_KEY);
totalEndSigs = (Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY);
splitPartitions = (Boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
saltColumns = (Boolean) map.get(StormConstants.SALT_COLUMNS_KEY);
rowDivisions = ((Long) map.get(StormConstants.ROW_DIVISIONS_KEY)).intValue();
// If splitPartitions==true, the data is incoming partition by partition, rather than record by record.
// The numRecords below will increment every partition elt exceed the maxHitsPerSelector param far too
// soon unless the latter is modified.
if (splitPartitions)
maxHitsPerSelector *= query.getQueryInfo().getNumPartitionsPerDataElement();
rand = new Random();
logger.info("Initialized EncRowCalcBolt.");
}
@Override
public void execute(Tuple tuple)
{
if (tuple.getSourceStreamId().equals(StormConstants.DEFAULT))
{
matrixElements = processTupleFromPartitionDataBolt(tuple); // tuple: <hash,partitions>
if (buffering)
{
logger.debug("Buffering tuple.");
bufferedValues.addAll(matrixElements);
}
else
{
emitTuples(matrixElements);
}
}
else if (StormUtils.isTickTuple(tuple) && !buffering)
{
logger.debug("Sending flush signal to EncColMultBolt.");
outputCollector.emit(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Values(1));
colIndexByRow.clear();
hitsByRow.clear();
buffering = true;
}
else if (tuple.getSourceStreamId().equals(StormConstants.ENCCOLMULTBOLT_SESSION_END))
{
numEndSigs += 1;
logger.debug("SessionEnd signal {} of {} received", numEndSigs, totalEndSigs);
// Need to receive signal from all EncColMultBolt instances before stopping buffering.
if (numEndSigs == totalEndSigs)
{
logger.debug("Buffering completed, emitting {} tuples.", bufferedValues.size());
emitTuples(bufferedValues);
bufferedValues.clear();
buffering = false;
numEndSigs = 0;
}
}
outputCollector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
{
outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH));
}
/***
* Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
* colIndexByRow and hitsByRow appropriately.
*
* @param tuple
* {@code Tuple}
* @return {@code List<Tuple2>}
*/
private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
{
matrixElements.clear();
int rowIndex = tuple.getIntegerByField(StormConstants.HASH_FIELD);
if (!colIndexByRow.containsKey(rowIndex))
{
colIndexByRow.put(rowIndex, 0);
hitsByRow.put(rowIndex, 0);
}
if (splitPartitions)
{
dataArray.add((BigInteger) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD));
}
else
{
dataArray = (ArrayList<BigInteger>) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD);
}
logger.debug("Retrieving {} elements in EncRowCalcBolt.", dataArray.size());
try
{
int colIndex = colIndexByRow.get(rowIndex);
int numRecords = hitsByRow.get(rowIndex);
if (limitHitsPerSelector && numRecords < maxHitsPerSelector)
{
logger.debug("computing matrix elements.");
matrixElements = ComputeEncryptedRow.computeEncRow(dataArray, query, rowIndex, colIndex);
colIndexByRow.put(rowIndex, colIndex + matrixElements.size());
hitsByRow.put(rowIndex, numRecords + 1);
}
else if (limitHitsPerSelector)
{
logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + numRecords);
}
} catch (IOException e)
{
logger.warn("Caught IOException while encrypting row. ", e);
}
dataArray.clear();
return matrixElements;
}
private void emitTuples(List<Tuple2<Long,BigInteger>> matrixElements)
{
// saltColumns distributes the column multiplication done in the next bolt EncColMultBolt to avoid hotspotting.
if (saltColumns)
{
for (Tuple2<Long,BigInteger> sTuple : matrixElements)
{
outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), rand.nextInt(rowDivisions)));
}
}
else
{
for (Tuple2<Long,BigInteger> sTuple : matrixElements)
{
outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), 0));
}
}
}
private synchronized static void setQuery(Map map)
{
if (!querySet)
{
query = StormUtils.prepareQuery(map);
querySet = true;
}
}
}