| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pirk.storm; |
| |
| import java.io.File; |
| import java.math.BigInteger; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import kafka.admin.AdminUtils; |
| import kafka.server.KafkaConfig; |
| import kafka.server.KafkaServer; |
| import kafka.utils.ZKStringSerializer$; |
| import kafka.utils.ZkUtils; |
| |
| import org.I0Itec.zkclient.ZkClient; |
| import org.I0Itec.zkclient.ZkConnection; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.curator.test.TestingServer; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.pirk.encryption.Paillier; |
| import org.apache.pirk.querier.wideskies.Querier; |
| import org.apache.pirk.querier.wideskies.QuerierConst; |
| import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; |
| import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; |
| import org.apache.pirk.query.wideskies.QueryInfo; |
| import org.apache.pirk.responder.wideskies.storm.OutputBolt; |
| import org.apache.pirk.responder.wideskies.storm.PirkHashScheme; |
| import org.apache.pirk.responder.wideskies.storm.PirkTopology; |
| import org.apache.pirk.responder.wideskies.storm.StormConstants; |
| import org.apache.pirk.response.wideskies.Response; |
| import org.apache.pirk.schema.query.filter.StopListFilter; |
| import org.apache.pirk.schema.response.QueryResponseJSON; |
| import org.apache.pirk.serialization.LocalFileSystemStore; |
| import org.apache.pirk.test.utils.BaseTests; |
| import org.apache.pirk.test.utils.Inputs; |
| import org.apache.pirk.test.utils.TestUtils; |
| import org.apache.pirk.utils.QueryResultsWriter; |
| import org.apache.pirk.utils.SystemConfiguration; |
| import org.apache.storm.Config; |
| import org.apache.storm.ILocalCluster; |
| import org.apache.storm.Testing; |
| import org.apache.storm.generated.StormTopology; |
| import org.apache.storm.kafka.SpoutConfig; |
| import org.apache.storm.kafka.ZkHosts; |
| import org.apache.storm.spout.SchemeAsMultiScheme; |
| import org.apache.storm.testing.IntegrationTest; |
| import org.apache.storm.testing.MkClusterParam; |
| import org.apache.storm.testing.TestJob; |
| import org.json.simple.JSONObject; |
| |
| import org.junit.AfterClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TemporaryFolder; |
| import org.slf4j.LoggerFactory; |
| |
| @Category(IntegrationTest.class) |
| public class KafkaStormIntegrationTest |
| { |
| private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class); |
| |
| private static final LocalFileSystemStore localStore = new LocalFileSystemStore(); |
| |
| private static TestingServer zookeeperLocalCluster; |
| private static KafkaServer kafkaLocalBroker; |
| private static ZkClient zkClient; |
| |
| private static final String topic = "pirk_test_topic"; |
| private static final String kafkaTmpDir = "/tmp/kafka"; |
| |
| @Rule |
| public TemporaryFolder folder = new TemporaryFolder(); |
| |
| private static File fileQuery; |
| private static File fileQuerier; |
| |
| private QueryInfo queryInfo; |
| private BigInteger nSquared; |
| |
| private static int testCountDown = 4; |
| |
| @Test |
| public void testKafkaStormIntegration() throws Exception |
| { |
| SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); |
| SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10"); |
| SystemConfiguration.setProperty("storm.spout.parallelism", "1"); |
| SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1"); |
| SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2"); |
| SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2"); |
| SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "8"); |
| SystemConfiguration.setProperty("storm.rowDivs", "2"); |
| SystemConfiguration.setProperty("hdfs.use", "false"); |
| |
| startZookeeper(); |
| startKafka(); |
| |
| SystemConfiguration.setProperty("kafka.topic", topic); |
| SystemConfiguration.setProperty("storm.topoName", "pirTest"); |
| |
| // Create encrypted file |
| SystemConfiguration.setProperty("pir.stopListFile", "none"); |
| Inputs.createSchemaFiles(StopListFilter.class.getName()); |
| |
| // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier |
| performEncryption(); |
| SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath()); |
| |
| KafkaProducer<String,String> producer = new KafkaProducer<>(createKafkaProducerConfig()); |
| loadTestData(producer); |
| |
| logger.info("Test (splitPartitions,saltColumns) = (true,true)"); |
| SystemConfiguration.setProperty("storm.splitPartitions", "true"); |
| SystemConfiguration.setProperty("storm.saltColumns", "true"); |
| runTest(); |
| |
| logger.info("Test (splitPartitions,saltColumns) = (true,false)"); |
| SystemConfiguration.setProperty("storm.splitPartitions", "true"); |
| SystemConfiguration.setProperty("storm.saltColumns", "false"); |
| runTest(); |
| |
| logger.info("Test (splitPartitions,saltColumns) = (false,true)"); |
| SystemConfiguration.setProperty("storm.splitPartitions", "false"); |
| SystemConfiguration.setProperty("storm.saltColumns", "true"); |
| runTest(); |
| |
| logger.info("Test (splitPartitions,saltColumns) = (false,false)"); |
| SystemConfiguration.setProperty("storm.splitPartitions", "false"); |
| SystemConfiguration.setProperty("storm.saltColumns", "false"); |
| runTest(); |
| } |
| |
| private void runTest() throws Exception |
| { |
| File responderFile = File.createTempFile("responderFile", ".txt"); |
| logger.info("Starting topology."); |
| runTopology(responderFile); |
| |
| // decrypt results |
| logger.info("Decrypting results. " + responderFile.length()); |
| File fileFinalResults = performDecryption(responderFile); |
| |
| // check results |
| List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults); |
| BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, Inputs.createJSONDataElements()); |
| |
| responderFile.deleteOnExit(); |
| fileFinalResults.deleteOnExit(); |
| } |
| |
| private void runTopology(File responderFile) throws Exception |
| { |
| MkClusterParam mkClusterParam = new MkClusterParam(); |
| // The test sometimes fails because of timing issues when more than 1 supervisor set. |
| mkClusterParam.setSupervisors(1); |
| |
| // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid worrying about timing. |
| Config conf = PirkTopology.createStormConf(); |
| conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath()); |
| conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString()); |
| conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap()); |
| // conf.setDebug(true); |
| mkClusterParam.setDaemonConf(conf); |
| |
| TestJob testJob = createPirkTestJob(conf); |
| Testing.withLocalCluster(mkClusterParam, testJob); |
| // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob); |
| } |
| |
| private TestJob createPirkTestJob(final Config config) |
| { |
| final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config); |
| return new TestJob() |
| { |
| StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig); |
| |
| @Override |
| public void run(ILocalCluster iLocalCluster) throws Exception |
| { |
| iLocalCluster.submitTopology("pirk_integration_test", config, topology); |
| logger.info("Pausing for setup."); |
| // Thread.sleep(4000); |
| // KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); |
| // loadTestData(producer); |
| // Thread.sleep(10000); |
| while (OutputBolt.latch.getCount() == testCountDown) |
| { |
| Thread.sleep(1000); |
| } |
| testCountDown -= 1; |
| |
| logger.info("Finished..."); |
| } |
| }; |
| } |
| |
| private SpoutConfig setUpTestKafkaSpout(Config conf) |
| { |
| ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString()); |
| |
| SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, "/pirk_test_root", "pirk_integr_test_spout"); |
| kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf)); |
| logger.info("KafkaConfig initialized..."); |
| |
| return kafkaConfig; |
| } |
| |
| private void startZookeeper() throws Exception |
| { |
| logger.info("Starting zookeeper."); |
| zookeeperLocalCluster = new TestingServer(); |
| zookeeperLocalCluster.start(); |
| logger.info("Zookeeper initialized."); |
| |
| } |
| |
| private void startKafka() throws Exception |
| { |
| FileUtils.deleteDirectory(new File(kafkaTmpDir)); |
| |
| Properties props = new Properties(); |
| props.setProperty("zookeeper.session.timeout.ms", "100000"); |
| props.put("advertised.host.name", "localhost"); |
| props.put("port", 11111); |
| // props.put("broker.id", "0"); |
| props.put("log.dir", kafkaTmpDir); |
| props.put("enable.zookeeper", "true"); |
| props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString()); |
| KafkaConfig kafkaConfig = KafkaConfig.fromProps(props); |
| kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), scala.Option.apply("kafkaThread")); |
| kafkaLocalBroker.startup(); |
| |
| zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 60000, ZKStringSerializer$.MODULE$); |
| ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperLocalCluster.getConnectString()), false); |
| // ZkUtils zkUtils = ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false); |
| AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception |
| { |
| zkClient.close(); |
| kafkaLocalBroker.shutdown(); |
| zookeeperLocalCluster.stop(); |
| |
| FileUtils.deleteDirectory(new File(kafkaTmpDir)); |
| } |
| |
| private Map<String,Object> createKafkaProducerConfig() |
| { |
| String kafkaHostName = "localhost"; |
| int kafkaPorts = 11111; |
| Map<String,Object> config = new HashMap<>(); |
| config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts); |
| config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| |
| return config; |
| } |
| |
| private void loadTestData(KafkaProducer<String,String> producer) |
| { |
| for (JSONObject dataRecord : Inputs.createJSONDataElements()) |
| { |
| logger.info("Sending record to Kafka " + dataRecord.toString()); |
| producer.send(new ProducerRecord<>(topic, dataRecord.toString())); |
| } |
| } |
| |
| private void performEncryption() throws Exception |
| { |
| List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); |
| String queryType = Inputs.DNS_HOSTNAME_QUERY; |
| |
| Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); |
| |
| nSquared = paillier.getNSquared(); |
| |
| queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType, |
| false, true, false); |
| |
| // Perform the encryption |
| logger.info("Performing encryption of the selectors - forming encrypted query vectors:"); |
| EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier); |
| Querier querier = encryptQuery.encrypt(1); |
| logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); |
| |
| // Write out files. |
| fileQuerier = folder.newFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG + ".txt"); |
| fileQuery = folder.newFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG + ".txt"); |
| |
| localStore.store(fileQuerier.getAbsolutePath(), querier); |
| localStore.store(fileQuery, querier.getQuery()); |
| } |
| |
| private File performDecryption(File responseFile) throws Exception |
| { |
| File finalResults = File.createTempFile("finalFileResults", ".txt"); |
| String querierFilePath = fileQuerier.getAbsolutePath(); |
| String responseFilePath = responseFile.getAbsolutePath(); |
| String outputFile = finalResults.getAbsolutePath(); |
| int numThreads = 1; |
| |
| Response response = localStore.recall(responseFilePath, Response.class); |
| Querier querier = localStore.recall(querierFilePath, Querier.class); |
| |
| // Perform decryption and output the result file |
| DecryptResponse decryptResponse = new DecryptResponse(response, querier); |
| decryptResponse.decrypt(numThreads); |
| QueryResultsWriter.writeResultFile(outputFile, decryptResponse.decrypt(numThreads)); |
| return finalResults; |
| } |
| |
| } |