| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.metron.pcap.mr; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.metron.common.utils.timestamp.TimestampConverters; |
| import org.apache.metron.job.Finalizer; |
| import org.apache.metron.job.JobStatus; |
| import org.apache.metron.job.JobStatus.State; |
| import org.apache.metron.job.Pageable; |
| import org.apache.metron.job.Statusable; |
| import org.apache.metron.pcap.PcapPages; |
| import org.apache.metron.pcap.config.FixedPcapConfig; |
| import org.apache.metron.pcap.config.PcapOptions; |
| import org.apache.metron.pcap.filter.PcapFilterConfigurator; |
| import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| |
| import java.io.IOException; |
| import java.util.*; |
| |
| import static java.lang.Long.toUnsignedString; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.mockito.Mockito.*; |
| |
| public class PcapJobTest { |
| |
| @Mock |
| private Job mrJob; |
| @Mock |
| private org.apache.hadoop.mapreduce.JobStatus mrStatus; |
| @Mock |
| private JobID jobId; |
| @Mock |
| private Finalizer<Path> finalizer; |
| private TestTimer timer; |
| private Pageable<Path> pageableResult; |
| private FixedPcapConfig config; |
| private Configuration hadoopConfig; |
| private FileSystem fileSystem; |
| private String jobIdVal = "job_abc_123"; |
| private Path basePath; |
| private Path baseOutPath; |
| private long startTime; |
| private long endTime; |
| private int numReducers; |
| private int numRecordsPerFile; |
| private Path finalOutputPath; |
| private Map<String, String> fixedFields; |
| private PcapJob<Map<String, String>> testJob; |
| |
| @BeforeEach |
| public void setup() throws IOException { |
| MockitoAnnotations.initMocks(this); |
| basePath = new Path("basepath"); |
| baseOutPath = new Path("outpath"); |
| startTime = 100; |
| endTime = 200; |
| numReducers = 5; |
| numRecordsPerFile = 5; |
| fixedFields = new HashMap<>(); |
| fixedFields.put("ip_src_addr", "192.168.1.1"); |
| hadoopConfig = new Configuration(); |
| fileSystem = FileSystem.get(hadoopConfig); |
| finalOutputPath = new Path("finaloutpath"); |
| when(jobId.toString()).thenReturn(jobIdVal); |
| when(mrStatus.getJobID()).thenReturn(jobId); |
| when(mrJob.getJobID()).thenReturn(jobId); |
| pageableResult = new PcapPages(); |
| timer = new TestTimer(); |
| // handles setting the file name prefix under the hood |
| config = new FixedPcapConfig(clock -> "clockprefix"); |
| PcapOptions.HADOOP_CONF.put(config, hadoopConfig); |
| PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig)); |
| PcapOptions.BASE_PATH.put(config, basePath); |
| PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath); |
| PcapOptions.START_TIME_NS.put(config, startTime); |
| PcapOptions.END_TIME_NS.put(config, endTime); |
| PcapOptions.NUM_REDUCERS.put(config, numReducers); |
| PcapOptions.FIELDS.put(config, fixedFields); |
| PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator()); |
| PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile); |
| PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath); |
| testJob = new TestJob<>(mrJob); |
| testJob.setStatusInterval(1); |
| testJob.setCompleteCheckInterval(1); |
| testJob.setTimer(timer); |
| } |
| |
| private class TestJob<T> extends PcapJob<T> { |
| |
| private final Job mrJob; |
| |
| public TestJob(Job mrJob) { |
| this.mrJob = mrJob; |
| } |
| |
| @Override |
| public Job createJob(Optional<String> jobName, |
| Path basePath, |
| Path outputPath, |
| long beginNS, |
| long endNS, |
| int numReducers, |
| T fields, |
| Configuration conf, |
| FileSystem fs, |
| PcapFilterConfigurator<T> filterImpl) { |
| return mrJob; |
| } |
| } |
| |
| private class TestTimer extends Timer { |
| |
| private TimerTask task; |
| |
| @Override |
| public void scheduleAtFixedRate(TimerTask task, long delay, long period) { |
| this.task = task; |
| } |
| |
| public void updateJobStatus() { |
| task.run(); |
| } |
| |
| } |
| |
| @Test |
| public void partition_gives_value_in_range() { |
| long start = 1473897600000000000L; |
| long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); |
| Configuration conf = new Configuration(); |
| conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); |
| conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); |
| conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); |
| PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); |
| partitioner.setConf(conf); |
| assertThat("Partition not in range", |
| partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), |
| equalTo(8)); |
| } |
| |
| @Test |
| public void job_succeeds_synchronously() throws Exception { |
| pageableResult = new PcapPages( |
| Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); |
| when(finalizer.finalizeJob(any())).thenReturn(pageableResult); |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| Pageable<Path> results = statusable.get(); |
| assertThat(results.getSize(), equalTo(3)); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.SUCCEEDED)); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| assertThat(status.getJobId(), equalTo(jobIdVal)); |
| } |
| |
| @Test |
| public void job_fails_synchronously() throws Exception { |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| Pageable<Path> results = statusable.get(); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.FAILED)); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| assertThat(results.getSize(), equalTo(0)); |
| } |
| |
| @Test |
| public void job_fails_with_killed_status_synchronously() throws Exception { |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| Pageable<Path> results = statusable.get(); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.KILLED)); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| assertThat(results.getSize(), equalTo(0)); |
| } |
| |
| @Test |
| public void job_succeeds_asynchronously() throws Exception { |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.SUCCEEDED)); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| } |
| |
| @Test |
| public void job_reports_percent_complete() throws Exception { |
| when(mrJob.isComplete()).thenReturn(false); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| when(mrJob.mapProgress()).thenReturn(0.5f); |
| when(mrJob.reduceProgress()).thenReturn(0f); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.RUNNING)); |
| assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); |
| assertThat(status.getPercentComplete(), equalTo(25.0 * 0.75)); |
| when(mrJob.mapProgress()).thenReturn(1.0f); |
| when(mrJob.reduceProgress()).thenReturn(0.5f); |
| timer.updateJobStatus(); |
| status = statusable.getStatus(); |
| assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); |
| assertThat(status.getPercentComplete(), equalTo(75.0 * 0.75)); |
| when(mrJob.mapProgress()).thenReturn(1.0f); |
| when(mrJob.reduceProgress()).thenReturn(1.0f); |
| timer.updateJobStatus(); |
| status = statusable.getStatus(); |
| assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 100.0%")); |
| assertThat(status.getPercentComplete(), equalTo(75.0)); |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); |
| when(mrJob.mapProgress()).thenReturn(1.0f); |
| when(mrJob.reduceProgress()).thenReturn(1.0f); |
| timer.updateJobStatus(); |
| status = statusable.getStatus(); |
| assertThat(status.getDescription(), equalTo("Job completed.")); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| } |
| |
| @Test |
| public void killing_job_causes_status_to_return_KILLED_state() throws Exception { |
| when(mrJob.isComplete()).thenReturn(false); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| statusable.kill(); |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); |
| timer.updateJobStatus(); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.KILLED)); |
| } |
| |
| @Test |
| public void handles_null_values_with_defaults() throws Exception { |
| PcapOptions.START_TIME_NS.put(config, null); |
| PcapOptions.END_TIME_NS.put(config, null); |
| PcapOptions.NUM_REDUCERS.put(config, null); |
| PcapOptions.NUM_RECORDS_PER_FILE.put(config, null); |
| |
| pageableResult = new PcapPages( |
| Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); |
| when(finalizer.finalizeJob(any())).thenReturn(pageableResult); |
| when(mrJob.isComplete()).thenReturn(true); |
| when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); |
| when(mrJob.getStatus()).thenReturn(mrStatus); |
| Statusable<Path> statusable = testJob.submit(finalizer, config); |
| timer.updateJobStatus(); |
| Pageable<Path> results = statusable.get(); |
| assertThat(results.getSize(), equalTo(3)); |
| JobStatus status = statusable.getStatus(); |
| assertThat(status.getState(), equalTo(State.SUCCEEDED)); |
| assertThat(status.getPercentComplete(), equalTo(100.0)); |
| assertThat(status.getJobId(), equalTo(jobIdVal)); |
| } |
| |
| @Test |
| public void get_should_print_status() throws Exception { |
| Map<String, Object> configuration = new HashMap<>(); |
| testJob.setConfiguration(configuration); |
| testJob.setMrJob(mrJob); |
| testJob.setJobStatus(new JobStatus().withState(State.SUCCEEDED)); |
| |
| testJob.get(); |
| verify(mrJob, times(0)).monitorAndPrintJob(); |
| |
| PcapOptions.PRINT_JOB_STATUS.put(configuration, true); |
| testJob.get(); |
| verify(mrJob, times(1)).monitorAndPrintJob(); |
| verifyNoMoreInteractions(mrJob); |
| } |
| |
| } |