blob: d15fe1dbb700a790dc934210d9dee416c2ebf5d0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.pcap.query;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.job.Finalizer;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.config.FixedPcapConfig;
import org.apache.metron.pcap.config.PcapConfig.PrefixStrategy;
import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.mr.PcapJob;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
public class PcapCliTest {
@Mock
private PcapJob jobRunner;
@Mock
private Clock clock;
private String execDir;
private PrefixStrategy prefixStrategy;
@BeforeEach
public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
execDir = System.getProperty("user.dir");
prefixStrategy = clock -> "random_prefix";
}
@Test
public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Exception {
String[] args = {
"fixed",
"-start_time", "500",
"-ip_src_addr", "192.168.1.1",
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
"-packet_filter", "`casey`"
};
HashMap<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
put(Constants.Fields.SRC_PORT.getName(), "8081");
put(Constants.Fields.DST_PORT.getName(), "8082");
put(Constants.Fields.PROTOCOL.getName(), "6");
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
}};
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L);
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
verify(jobRunner).get();
}
/**
* Check that "map" entries exist in the tested map "item". Note, will not work for complex
* Objects where equals() does not compare contents favorably. e.g. Configurator() did not work.
*/
private <K, V> Matcher<Map<K, V>> mapContaining(Map<K, V> map) {
return new TypeSafeMatcher<Map<K, V>>() {
@Override
protected boolean matchesSafely(Map<K, V> item) {
for(K key: map.keySet()) {
if (key.equals(PcapOptions.HADOOP_CONF.getKey())) {
Configuration itemConfiguration = (Configuration) item.get(PcapOptions.HADOOP_CONF.getKey());
Map<String, Object> mapConfiguration = (Map<String, Object>) map.get(PcapOptions.HADOOP_CONF.getKey());
for(String setting: mapConfiguration.keySet()) {
if (!mapConfiguration.get(setting).equals(itemConfiguration.get(setting, ""))) {
return false;
}
}
} else {
V itemValue = item.get(key);
V mapValue = map.get(key);
if (itemValue != null ? !itemValue.equals(mapValue) : mapValue != null) {
return false;
}
}
}
return true;
}
@Override
public void describeTo(Description description) {
description.appendText("Should contain items: ");
for (Entry<K, V> entry : map.entrySet()) {
StringBuilder sb = new StringBuilder();
sb.append("key=");
sb.append(entry.getKey());
sb.append(",value=");
sb.append(entry.getValue());
description.appendText(sb.toString());
}
}
};
}
@Test
public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_dateformat() throws Exception {
String[] args = {
"fixed",
"-start_time", "500",
"-end_time", "1000",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-ip_src_addr", "192.168.1.1",
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
"-include_reverse",
"-num_reducers", "10",
"-records_per_file", "1000",
"-finalizer_threads", "10"
};
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
put(Constants.Fields.SRC_PORT.getName(), "8081");
put(Constants.Fields.DST_PORT.getName(), "8082");
put(Constants.Fields.PROTOCOL.getName(), "6");
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
}};
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
PcapOptions.BASE_PATH.put(config, "/base/path");
PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L);
PcapOptions.END_TIME_MS.put(config, 1000L);
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
PcapOptions.PRINT_JOB_STATUS.put(config, true);
PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
verify(jobRunner).get();
}
@Test
public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exception {
String[] args = {
"fixed",
"-start_time", "2016-06-13-18:35.00",
"-end_time", "2016-06-15-18:35.00",
"-date_format", "yyyy-MM-dd-HH:mm.ss",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-ip_src_addr", "192.168.1.1",
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
"-include_reverse",
"-num_reducers", "10",
"-records_per_file", "1000",
"-yq", "pcap",
"-finalizer_threads", "10"
};
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
put(Constants.Fields.SRC_PORT.getName(), "8081");
put(Constants.Fields.DST_PORT.getName(), "8082");
put(Constants.Fields.PROTOCOL.getName(), "6");
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
}};
long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
PcapOptions.BASE_PATH.put(config, "/base/path");
PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config
PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
PcapOptions.PRINT_JOB_STATUS.put(config, true);
PcapOptions.HADOOP_CONF.put(config, new HashMap<String, Object>() {{
put(MRJobConfig.QUEUE_NAME, "pcap");
}});
PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
verify(jobRunner).get();
}
private long asNanos(String inDate, String format) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(format);
Date date = sdf.parse(inDate);
return TimestampConverters.MILLISECONDS.toNanoseconds(date.getTime());
}
private byte[] asBytes(String val) {
return val.getBytes(StandardCharsets.UTF_8);
}
@Test
public void runs_query_pcap_filter_job_with_default_argument_list() throws Exception {
String[] args = {
"query",
"-start_time", "500",
"-query", "some query string"
};
String query = "some query string";
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L);
PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "1");
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
verify(jobRunner).get();
}
@Test
public void runs_query_pcap_filter_job_with_full_argument_list() throws Exception {
String[] args = {
"query",
"-start_time", "500",
"-end_time", "1000",
"-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-query", "some query string",
"-records_per_file", "1000",
"-finalizer_threads", "10"
};
String query = "some query string";
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
PcapOptions.BASE_PATH.put(config, "/base/path");
PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L); // needed bc defaults in config
PcapOptions.END_TIME_MS.put(config, 1000L); // needed bc defaults in config
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
PcapOptions.PRINT_JOB_STATUS.put(config, true);
PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
verify(jobRunner).get();
}
// INVALID OPTION CHECKS
@Test
public void invalid_fixed_filter_arg_prints_help() throws Exception {
String[] args = {
"fixed",
"-start_time", "500",
"-end_time", "1000",
"-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-query", "THIS IS AN ERROR"
};
assertCliError(args, "Fixed", "Unrecognized option: -query");
}
/**
*
* @param args PcapJob args
* @param type Fixed|Query
* @param optMsg Expected error message
*/
public void assertCliError(String[] args, String type, String optMsg)
throws UnsupportedEncodingException {
PrintStream originalOutStream = System.out;
PrintStream originalErrOutStream = System.err;
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream outStream = new PrintStream(new BufferedOutputStream(bos), false, StandardCharsets.UTF_8.name());
System.setOut(outStream);
ByteArrayOutputStream ebos = new ByteArrayOutputStream();
PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos), false, StandardCharsets.UTF_8.name());
System.setErr(errOutStream);
PcapCli cli = new PcapCli(jobRunner, clock -> "random_prefix");
assertThat("Expect errors on run", cli.run(args), equalTo(-1));
assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true));
assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true));
} finally {
System.setOut(originalOutStream);
System.setErr(originalErrOutStream);
}
}
@Test
public void invalid_query_filter_arg_prints_help() throws Exception {
String[] args = {
"query",
"-start_time", "500",
"-end_time", "1000",
"-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-ip_src_addr", "THIS IS AN ERROR"
};
assertCliError(args, "Query", "");
}
@Test
public void missing_start_time_arg_prints_error_and_help() throws Exception {
String[] args = {
"fixed",
"-ip_src_addr", "192.168.1.1",
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
"-num_reducers", "10"
};
assertCliError(args, "Fixed", "Missing required option: st");
}
}