| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.metron.pcap.integration; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Collections2; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import javax.annotation.Nullable; |
| import kafka.consumer.ConsumerIterator; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.common.utils.HDFSUtils; |
| import org.apache.metron.integration.BaseIntegrationTest; |
| import org.apache.metron.integration.ComponentRunner; |
| import org.apache.metron.integration.Processor; |
| import org.apache.metron.integration.ProcessorResult; |
| import org.apache.metron.integration.ReadinessState; |
| import org.apache.metron.integration.components.FluxTopologyComponent; |
| import org.apache.metron.integration.components.KafkaComponent; |
| import org.apache.metron.integration.components.MRComponent; |
| import org.apache.metron.integration.components.ZKServerComponent; |
| import org.apache.metron.integration.utils.KafkaUtil; |
| import org.apache.metron.job.JobStatus; |
| import org.apache.metron.job.Pageable; |
| import org.apache.metron.job.Statusable; |
| import org.apache.metron.pcap.PacketInfo; |
| import org.apache.metron.pcap.PcapHelper; |
| import org.apache.metron.pcap.PcapMerger; |
| import org.apache.metron.pcap.config.FixedPcapConfig; |
| import org.apache.metron.pcap.config.PcapOptions; |
| import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; |
| import org.apache.metron.pcap.filter.query.QueryPcapFilter; |
| import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies; |
| import org.apache.metron.pcap.mr.PcapJob; |
| import org.apache.metron.pcap.query.PcapCli; |
| import org.apache.metron.spout.pcap.Endianness; |
| import org.apache.metron.spout.pcap.deserializer.Deserializers; |
| import org.apache.metron.test.utils.UnitTestHelper; |
| import org.json.simple.JSONObject; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Disabled; |
| import org.junit.jupiter.api.Test; |
| |
| public class PcapTopologyIntegrationTest extends BaseIntegrationTest { |
| final static String KAFKA_TOPIC = "pcap"; |
| private static String BASE_DIR = "pcap"; |
| private static String DATA_DIR = BASE_DIR + "/data_dir"; |
| private static String INTERIM_RESULT = BASE_DIR + "/query"; |
| private static String OUTPUT_DIR = BASE_DIR + "/output"; |
| private static final int MAX_RETRIES = 30; |
| private static final int SLEEP_MS = 500; |
| private static String topologiesDir = "src/main/flux"; |
| private static String targetDir = "target"; |
| private static ComponentRunner runner; |
| private static File inputDir; |
| private static File interimResultDir; |
| private static File outputDir; |
| private static List<Map.Entry<byte[], byte[]>> pcapEntries; |
| private static boolean withHeaders; |
| private FixedPcapConfig configuration; |
| |
| private static void clearOutDirs(File... dirs) throws IOException { |
| for (File dir : dirs) { |
| for (File f : dir.listFiles()) { |
| if (f.isDirectory()) { |
| FileUtils.deleteDirectory(f); |
| } else { |
| f.delete(); |
| } |
| } |
| } |
| } |
| |
| private static int numFiles(File outDir, Configuration config) { |
| return outDir.list(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return !name.endsWith(".crc"); |
| } |
| }).length; |
| } |
| |
| // This will eventually be completely deprecated. |
| // As it takes a significant amount of testing, the test is being disabled. |
| @Disabled |
| @Test |
| public void testTimestampInPacket() throws Exception { |
| setupTopology(new Function<Properties, Void>() { |
| @Nullable |
| @Override |
| public Void apply(@Nullable Properties input) { |
| input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_PACKET.toString()); |
| return null; |
| } |
| }, (kafkaComponent, pcapEntries) -> kafkaComponent.writeMessages( KAFKA_TOPIC |
| , Collections2.transform(pcapEntries |
| , input -> input.getValue() |
| ) |
| ) |
| , true |
| ); |
| } |
| |
| /** |
| * Sets up component infrastructure once for all tests. |
| */ |
| @BeforeAll |
| public static void setupAll() throws Exception { |
| System.out.println("Setting up test components"); |
| withHeaders = false; |
| setupTopology(new Function<Properties, Void>() { |
| @Nullable |
| @Override |
| public Void apply(@Nullable Properties input) { |
| input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_KEY.toString()); |
| return null; |
| } |
| }, new SendEntries() { |
| @Override |
| public void send(KafkaComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> pcapEntries) throws Exception { |
| Producer<byte[], byte[]> producer = kafkaComponent.createProducer(byte[].class, byte[].class); |
| KafkaUtil.send(producer, pcapEntries, KAFKA_TOPIC, 2); |
| System.out.println("Sent pcap data: " + pcapEntries.size()); |
| { |
| int numMessages = 0; |
| ConsumerIterator<?, ?> it = kafkaComponent.getStreamIterator(KAFKA_TOPIC); |
| for (int i = 0; i < pcapEntries.size(); ++i, it.next()) { |
| numMessages++; |
| } |
| assertEquals(pcapEntries.size(), numMessages); |
| System.out.println("Wrote " + pcapEntries.size() + " to kafka"); |
| } |
| } |
| }, withHeaders); |
| System.out.println("Done with setup."); |
| } |
| |
| private static File getDir(String targetDir, String childDir) { |
| File directory = new File(new File(targetDir), childDir); |
| if (!directory.exists()) { |
| directory.mkdirs(); |
| } |
| return directory; |
| } |
| |
| /** |
| * Cleans up component infrastructure after all tests finish running. |
| */ |
| @AfterAll |
| public static void teardownAll() throws Exception { |
| System.out.println("Tearing down test infrastructure"); |
| System.out.println("Stopping runner"); |
| runner.stop(); |
| System.out.println("Done stopping runner"); |
| System.out.println("Clearing output directories"); |
| clearOutDirs(inputDir, interimResultDir, outputDir); |
| System.out.println("Finished"); |
| } |
| |
| private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> entries) { |
| return Bytes.toLong(entries.get(offset).getKey()); |
| } |
| |
| private static interface SendEntries { |
| public void send(KafkaComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> entries) throws Exception; |
| } |
| |
| public static void setupTopology(Function<Properties, Void> updatePropertiesCallback |
| ,SendEntries sendPcapEntriesCallback |
| ,boolean withHeaders |
| ) |
| throws Exception { |
| if (!new File(topologiesDir).exists()) { |
| topologiesDir = UnitTestHelper.findDir("topologies"); |
| } |
| targetDir = UnitTestHelper.findDir("target"); |
| inputDir = getDir(targetDir, DATA_DIR); |
| interimResultDir = getDir(targetDir, INTERIM_RESULT); |
| outputDir = getDir(targetDir, OUTPUT_DIR); |
| clearOutDirs(inputDir, interimResultDir, outputDir); |
| |
| File baseDir = new File(new File(targetDir), BASE_DIR); |
| //assertEquals(0, numFiles(outDir)); |
| assertNotNull(topologiesDir); |
| assertNotNull(targetDir); |
| Path pcapFile = new Path( |
| "../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput"); |
| pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders)); |
| assertTrue(Iterables.size(pcapEntries) > 0); |
| final Properties topologyProperties = new Properties() {{ |
| setProperty("topology.workers", "1"); |
| setProperty("topology.worker.childopts", ""); |
| setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC); |
| setProperty("kafka.pcap.start", "EARLIEST"); |
| setProperty("kafka.pcap.out", inputDir.getAbsolutePath()); |
| setProperty("kafka.pcap.numPackets", "2"); |
| setProperty("kafka.pcap.maxTimeMS", "200000000"); |
| setProperty("kafka.pcap.ts_granularity", "NANOSECONDS"); |
| setProperty("kafka.spout.parallelism", "1"); |
| setProperty("topology.auto-credentials", "[]"); |
| setProperty("kafka.security.protocol", "PLAINTEXT"); |
| setProperty("hdfs.sync.every", "1"); |
| setProperty("hdfs.replication.factor", "-1"); |
| }}; |
| updatePropertiesCallback.apply(topologyProperties); |
| |
| final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); |
| |
| final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, |
| Collections.singletonList( |
| new KafkaComponent.Topic(KAFKA_TOPIC, 1))); |
| |
| final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath()); |
| |
| FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() |
| .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml")) |
| .withTopologyName("pcap") |
| .withTopologyProperties(topologyProperties) |
| .build(); |
| //UnitTestHelper.verboseLogging(); |
| runner = new ComponentRunner.Builder() |
| .withComponent("mr", mr) |
| .withComponent("zk", zkServerComponent) |
| .withComponent("kafka", kafkaComponent) |
| .withComponent("storm", fluxComponent) |
| .withMaxTimeMS(-1) |
| .withMillisecondsBetweenAttempts(2000) |
| .withNumRetries(10) |
| .withCustomShutdownOrder(new String[]{"storm", "kafka", "zk", "mr"}) |
| .build(); |
| runner.start(); |
| |
| fluxComponent.submitTopology(); |
| sendPcapEntriesCallback.send(kafkaComponent, pcapEntries); |
| runner.process(new Processor<Void>() { |
| @Override |
| public ReadinessState process(ComponentRunner runner) { |
| int numFiles = numFiles(inputDir, mr.getConfiguration()); |
| int expectedNumFiles = pcapEntries.size() / 2; |
| if (numFiles == expectedNumFiles) { |
| return ReadinessState.READY; |
| } else { |
| return ReadinessState.NOT_READY; |
| } |
| } |
| |
| @Override |
| public ProcessorResult<Void> getResult() { |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * This is executed before each individual test. |
| */ |
| @BeforeEach |
| public void setup() throws IOException { |
| configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY); |
| Configuration hadoopConf = new Configuration(); |
| PcapOptions.JOB_NAME.put(configuration, "jobName"); |
| PcapOptions.HADOOP_CONF.put(configuration, hadoopConf); |
| PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf)); |
| PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath())); |
| PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath())); |
| PcapOptions.NUM_REDUCERS.put(configuration, 10); |
| PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); |
| PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath())); |
| PcapOptions.FINALIZER_THREADPOOL_SIZE.put(configuration, 4); |
| } |
| |
| @Test |
| public void filters_pcaps_by_start_end_ns_with_fixed_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, new HashMap()); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(2, resultPages.getSize(), "Expected 2 records returned."); |
| assertEquals(1, PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size(), "Expected 1 record in first file."); |
| assertEquals(1, PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size(), "Expected 1 record in second file."); |
| } |
| |
| @Test |
| public void filters_pcaps_by_start_end_ns_with_empty_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, ""); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(2, resultPages.getSize(), "Expected 2 records returned."); |
| assertEquals(1, PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size(), "Expected 1 record in first file."); |
| assertEquals(1, PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size(), "Expected 1 record in second file."); |
| } |
| |
| @Test |
| public void date_range_filters_out_all_results() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.FIELDS.put(configuration, new HashMap<>()); |
| PcapOptions.START_TIME_NS.put(configuration, 0); |
| PcapOptions.END_TIME_NS.put(configuration, 1); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0); |
| assertEquals("No results in specified date range.", |
| results.getStatus().getDescription()); |
| assertEquals(results.get().getSize(), 0); |
| } |
| |
| @Test |
| public void ip_address_filters_out_all_results_with_fixed_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ |
| put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); |
| }}); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| assertEquals(results.get().getSize(), 0); |
| } |
| |
| @Test |
| public void ip_address_filters_out_all_results_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'"); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| assertEquals(results.get().getSize(), 0); |
| } |
| |
| @Test |
| public void protocol_filters_out_all_results_with_fixed_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ |
| put(Constants.Fields.PROTOCOL.getName(), "foo"); |
| }}); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| assertEquals(results.get().getSize(), 0); |
| } |
| |
| @Test |
| public void protocol_filters_out_all_results_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); |
| PcapOptions.FIELDS.put(configuration, "protocol == 'foo'"); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| assertEquals(results.get().getSize(), 0); |
| } |
| |
| @Test |
| public void fixed_filter_returns_all_results_for_full_date_range() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, new HashMap<>()); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(pcapEntries.size(), resultPages.getSize()); |
| } |
| |
| @Test |
| public void query_filter_returns_all_results_for_full_date_range() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, ""); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(pcapEntries.size(), resultPages.getSize()); |
| } |
| |
| @Test |
| public void filters_results_by_dst_port_with_fixed_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ |
| put(Constants.Fields.DST_PORT.getName(), "22"); |
| }}); |
| PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); |
| PcapJob<Map<String, String>> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertTrue(resultPages.getSize() > 0); |
| assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { |
| @Override |
| public boolean apply(@Nullable JSONObject input) { |
| Object prt = input.get(Constants.Fields.DST_PORT.getName()); |
| return prt != null && prt.toString().equals("22"); |
| } |
| }, withHeaders) |
| ), resultPages.getSize() |
| ); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); |
| assertTrue(baos.toByteArray().length > 0); |
| } |
| |
| @Test |
| public void filters_results_by_dst_port_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22"); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { |
| @Override |
| public boolean apply(@Nullable JSONObject input) { |
| Object prt = input.get(Constants.Fields.DST_PORT.getName()); |
| return prt != null && prt.toString().equals("22"); |
| } |
| }, withHeaders) |
| ), resultPages.getSize() |
| ); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); |
| assertTrue(baos.toByteArray().length > 0); |
| } |
| |
| @Test |
| public void filters_results_by_dst_port_range_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792"); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { |
| @Override |
| public boolean apply(@Nullable JSONObject input) { |
| Object prt = input.get(Constants.Fields.DST_PORT.getName()); |
| return prt != null && ((Long) prt > 20 && (Long) prt < 55792); |
| } |
| }, withHeaders) |
| ), resultPages.getSize() |
| ); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); |
| assertTrue(baos.toByteArray().length > 0); |
| } |
| |
| @Test |
| public void filters_results_by_dst_port_greater_than_value_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790"); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Pageable<Path> resultPages = results.get(); |
| Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { |
| @Override |
| public boolean apply(@Nullable JSONObject input) { |
| Object prt = input.get(Constants.Fields.DST_PORT.getName()); |
| return prt != null && (Long) prt > 55790; |
| } |
| }, withHeaders) |
| ), resultPages.getSize() |
| ); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); |
| assertTrue(baos.toByteArray().length > 0); |
| } |
| |
| @Test |
| public void filters_results_by_BYTEARRAY_MATCHER_with_query_filter() throws Exception { |
| PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); |
| PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"); |
| PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); |
| PcapOptions.END_TIME_NS |
| .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); |
| PcapJob<String> job = new PcapJob<>(); |
| Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); |
| assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); |
| waitForJob(results); |
| |
| assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); |
| Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { |
| try { |
| return HDFSUtils.readBytes(path); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| }); |
| assertInOrder(bytes); |
| assertEquals(1, results.get().getSize()); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); |
| assertTrue(baos.toByteArray().length > 0); |
| } |
| |
| private void waitForJob(Statusable statusable) throws Exception { |
| for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { |
| if (!statusable.getStatus().getState().equals(JobStatus.State.RUNNING)) { |
| if (statusable.isDone()) { |
| return; |
| } |
| } |
| } |
| throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds"); |
| } |
| |
| private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException { |
| SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), |
| SequenceFile.Reader.file(pcapFile) |
| ); |
| List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>(); |
| IntWritable key = new IntWritable(); |
| BytesWritable value = new BytesWritable(); |
| while (reader.next(key, value)) { |
| byte[] pcapWithHeader = value.copyBytes(); |
| //if you are debugging and want the hex dump of the packets, uncomment the following: |
| |
| //for(byte b : pcapWithHeader) { |
| // System.out.print(String.format("%02x", b)); |
| //} |
| //System.out.println(""); |
| |
| long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader); |
| { |
| List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader); |
| for (PacketInfo pi : info) { |
| assertEquals(calculatedTs, pi.getPacketTimeInNanos()); |
| //IF you are debugging and want to see the packets, uncomment the following. |
| //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc()); |
| } |
| } |
| if (withHeaders) { |
| ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader)); |
| } else { |
| byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE]; |
| System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length); |
| ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw)); |
| } |
| } |
| return Iterables.limit(ret, 2 * (ret.size() / 2)); |
| } |
| |
| public static void assertInOrder(Iterable<byte[]> packets) { |
| long previous = 0; |
| for (byte[] packet : packets) { |
| for (JSONObject json : TO_JSONS.apply(packet)) { |
| Long current = Long.parseLong(json.get("ts_micro").toString()); |
| assertNotNull(current); |
| assertTrue(Long.compareUnsigned(current, previous) >= 0); |
| previous = current; |
| } |
| } |
| } |
| |
| public static Function<byte[], Iterable<JSONObject>> TO_JSONS = new Function<byte[], Iterable<JSONObject>>() { |
| @Nullable |
| @Override |
| public Iterable<JSONObject> apply(@Nullable byte[] input) { |
| try { |
| return PcapHelper.toJSON(PcapHelper.toPacketInfo(input)); |
| } catch (IOException e) { |
| throw new RuntimeException(e.getMessage(), e); |
| } |
| } |
| }; |
| |
| private Iterable<JSONObject> filterPcaps(Iterable<Map.Entry<byte[], byte[]>> pcaps |
| ,Predicate<JSONObject> predicate |
| ,boolean withHeaders |
| ) |
| { |
| Function<Map.Entry<byte[], byte[]>, byte[]> pcapTransform = null; |
| if(!withHeaders) { |
| final Endianness endianness = Endianness.getNativeEndianness(); |
| pcapTransform = kv -> PcapHelper.addGlobalHeader(PcapHelper.addPacketHeader(Bytes.toLong(kv.getKey()) |
| , kv.getValue() |
| , endianness |
| ) |
| , endianness |
| ); |
| } |
| else { |
| pcapTransform = kv -> kv.getValue(); |
| } |
| return Iterables.filter( |
| Iterables.concat( |
| Iterables.transform( |
| Iterables.transform(pcaps, pcapTransform) |
| , TO_JSONS |
| ) |
| ) |
| , predicate |
| ); |
| } |
| } |