blob: 9b1c1fc97201c7bf6b32c8ddec223426acc0722d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.pcap.mr;
import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.metron.common.utils.LazyLogger;
import org.apache.metron.common.utils.LazyLoggerFactory;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.job.Finalizer;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobStatus;
import org.apache.metron.job.JobStatus.State;
import org.apache.metron.job.Pageable;
import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.PcapPages;
import org.apache.metron.pcap.config.PcapGlobalDefaults;
import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.filter.PcapFilter;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.filter.PcapFilters;
import org.apache.metron.pcap.utils.FileFilterUtil;
/**
* Encompasses MapReduce job and final writing of Pageable results to specified location.
* Cleans up MapReduce results from HDFS on completion.
*/
public class PcapJob<T> implements Statusable<Path> {
private static final LazyLogger LOG = LazyLoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String START_TS_CONF = "start_ts";
public static final String END_TS_CONF = "end_ts";
public static final String WIDTH_CONF = "width";
private static final long THREE_SECONDS = 3000;
private static final long ONE_SECOND = 1000;
private final OutputDirFormatter outputDirFormatter;
private Job mrJob; // store a running MR job reference for async status check
private JobStatus jobStatus; // overall job status, including finalization step
private Finalizer<Path> finalizer;
private Map<String, Object> configuration;
private Pageable<Path> finalResults;
private Timer timer;
private long statusInterval; // how often timer thread checks job status.
private long completeCheckInterval; // how long we sleep between isDone checks in get()
public static enum PCAP_COUNTER {
MALFORMED_PACKET_COUNT
}
public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable {
private Configuration configuration;
Long start = null;
Long end = null;
Long width = null;
@Override
public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, int numPartitions) {
if (start == null) {
initialize();
}
long x = longWritable.get();
int ret = (int)Long.divideUnsigned(x - start, width);
if (ret > numPartitions) {
throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d"
, Long.toUnsignedString(x), width, ret, numPartitions)
);
}
return ret;
}
private void initialize() {
start = Long.parseUnsignedLong(configuration.get(START_TS_CONF));
end = Long.parseUnsignedLong(configuration.get(END_TS_CONF));
width = Long.parseLong(configuration.get(WIDTH_CONF));
}
@Override
public void setConf(Configuration conf) {
this.configuration = conf;
}
@Override
public Configuration getConf() {
return configuration;
}
}
public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
PcapFilter filter;
long start;
long end;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create();
filter.configure(context.getConfiguration());
start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF));
end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF));
}
@Override
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
if (greaterThanOrEqualTo(key.get(), start) && lessThanOrEqualTo(key.get(), end)) {
// It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1
// object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
// objects back to byte arrays, otherwise we could support more than one packet.
// Note: short-circuit findAny() func on stream
List<PacketInfo> packetInfos;
try {
packetInfos = PcapHelper.toPacketInfo(value.copyBytes());
} catch (Exception e) {
// toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets
context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1);
return;
}
boolean send = filteredPacketInfo(packetInfos).findAny().isPresent();
if (send) {
context.write(key, value);
}
}
}
private Stream<PacketInfo> filteredPacketInfo(List<PacketInfo> packetInfos) throws IOException {
return packetInfos.stream().filter(filter);
}
}
public static class PcapReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
@Override
protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for (BytesWritable value : values) {
context.write(key, value);
}
}
}
public PcapJob() {
jobStatus = new JobStatus();
finalResults = new PcapPages();
outputDirFormatter = new OutputDirFormatter();
timer = new Timer();
statusInterval = THREE_SECONDS;
completeCheckInterval = ONE_SECOND;
}
/**
* Primarily for testing.
*
* @param interval time in millis
*/
public void setStatusInterval(long interval) {
statusInterval = interval;
}
/**
* Primarily for testing.
*
* @param interval time in millis
*/
public void setCompleteCheckInterval(long interval) {
completeCheckInterval = interval;
}
@Override
public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> config)
throws JobException {
this.finalizer = finalizer;
this.configuration = config;
Optional<String> jobName = Optional.ofNullable(PcapOptions.JOB_NAME.get(configuration, String.class));
Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class);
FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class);
Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH
.getTransformedOrDefault(configuration, Path.class,
new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT));
long startTimeNs;
if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) {
startTimeNs = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L);
} else {
startTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L));
}
long endTimeNs;
if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) {
endTimeNs = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
} else {
endTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis()));
}
int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class, NUM_REDUCERS_DEFAULT);
T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
try {
Statusable<Path> statusable = query(jobName,
basePath,
baseInterimResultPath,
startTimeNs,
endTimeNs,
numReducers,
fields,
// create a new copy for each job, bad things happen when hadoop config is reused
new Configuration(hadoopConf),
fileSystem,
filterImpl);
PcapOptions.JOB_ID.put(configuration, statusable.getStatus().getJobId());
return statusable;
} catch (IOException | InterruptedException | ClassNotFoundException e) {
throw new JobException("Failed to run pcap query.", e);
}
}
/**
* Run query asynchronously.
*/
public Statusable<Path> query(Optional<String> jobName,
Path basePath,
Path baseInterimResultPath,
long beginNS,
long endNS,
int numReducers,
T fields,
Configuration conf,
FileSystem fs,
PcapFilterConfigurator<T> filterImpl)
throws IOException, ClassNotFoundException, InterruptedException {
String outputDirName = outputDirFormatter.format(beginNS, endNS, filterImpl.queryToString(fields));
if(LOG.isDebugEnabled()) {
DateFormat format = SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG
, SimpleDateFormat.LONG
);
String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000)));
String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
LOG.debug("Executing query {} on timerange from {} to {}", () -> filterImpl.queryToString(fields), ()-> from, () -> to);
}
Path interimResultPath = new Path(baseInterimResultPath, outputDirName);
PcapOptions.INTERIM_RESULT_PATH.put(configuration, interimResultPath);
mrJob = createJob(jobName
, basePath
, interimResultPath
, beginNS
, endNS
, numReducers
, fields
, conf
, fs
, filterImpl
);
if (mrJob == null) {
LOG.info("No files to process with specified date range.");
try {
setFinalResults(input -> new PcapPages(), configuration);
jobStatus.withState(State.SUCCEEDED).withDescription("No results in specified date range.")
.withPercentComplete(100.0);
} catch (JobException e) {
// This should not cause an error as we simply set results to an empty result set.
jobStatus.withState(State.FAILED).withDescription("Unable to finalize empty job.")
.withFailureException(e);
}
return this;
}
synchronized (this) {
// this block synchronized for proper variable visibility across threads once the status timer
// is started. mrJob and jobStatus need to be synchronized so that their references and internal
// state are made available to the timer thread. The references to these variables above need
// not be synchronized because the job will exit when only 1 thread will have had to use them.
mrJob.submit();
jobStatus.withState(State.SUBMITTED).withDescription("Job submitted")
.withJobId(mrJob.getJobID().toString());
}
startJobStatusTimerThread(statusInterval);
return this;
}
private void startJobStatusTimerThread(long interval) {
getTimer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (!updateStatus()) {
cancel(); // be gone, ye!
}
}
}, interval, interval);
}
public void setTimer(Timer timer) {
this.timer = timer;
}
private Timer getTimer() {
return timer;
}
/**
* Update job status info. Will finalize job when underlying MR job completes.
*
* @return true if should continue updating status, false otherwise.
*/
private boolean updateStatus() {
JobStatus tempStatus = null;
final float mrJobFraction = 0.75f; // fraction of total job progress calculation we're allocating to the MR job vs finalization
synchronized (this) {
tempStatus = new JobStatus(jobStatus);
}
boolean keepUpdating = true;
try {
boolean mrJobComplete = false;
org.apache.hadoop.mapreduce.JobStatus.State mrJobState = null;
String mrJobFailureInfo = null;
float mapProg = 0.0f;
float reduceProg = 0.0f;
synchronized (this) {
mrJobComplete = mrJob.isComplete();
org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus();
mrJobState = mrJobStatus.getState();
mrJobFailureInfo = mrJobStatus.getFailureInfo();
mapProg = mrJob.mapProgress();
reduceProg = mrJob.reduceProgress();
}
if (mrJobComplete) {
switch (mrJobState) {
case SUCCEEDED:
tempStatus.withPercentComplete(100.0 * mrJobFraction).withState(State.FINALIZING).withDescription("Finalizing job.");
try {
synchronized (this) {
// want to update the description while the job is finalizing
jobStatus = new JobStatus(tempStatus);
}
setFinalResults(finalizer, configuration);
tempStatus.withPercentComplete(100.0).withState(State.SUCCEEDED).withDescription("Job completed.");
} catch (JobException je) {
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription("Job finalize failed.")
.withFailureException(je);
}
break;
case FAILED:
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription(mrJobFailureInfo);
break;
case KILLED:
tempStatus.withPercentComplete(100.0).withState(State.KILLED).withDescription(mrJobFailureInfo);
break;
}
keepUpdating = false;
} else {
float mrJobProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
float totalProgress = mrJobProgress * mrJobFraction;
String description = String
.format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100);
tempStatus.withPercentComplete(totalProgress).withState(State.RUNNING)
.withDescription(description);
}
} catch (InterruptedException | IOException e) {
tempStatus.withPercentComplete(100.0).withState(State.FAILED).withFailureException(e);
keepUpdating = false;
}
synchronized (this) {
jobStatus = new JobStatus(tempStatus);
}
return keepUpdating;
}
/**
* Writes results using finalizer. Returns true on success, false otherwise. If no results
* to finalize, returns empty Pageable.
*
* @param finalizer Writes results.
* @param configuration Configure the finalizer.
* @return Returns true on success, false otherwise.
*/
private void setFinalResults(Finalizer<Path> finalizer, Map<String, Object> configuration)
throws JobException {
Pageable<Path> results = finalizer.finalizeJob(configuration);
if (results == null) {
results = new PcapPages();
}
synchronized (this) {
finalResults = results;
}
}
/**
* Creates, but does not submit the job. This is the core MapReduce mrJob. Empty input path
* results in a null to be returned instead of creating the job.
*/
public Job createJob(Optional<String> jobName
,Path basePath
, Path jobOutputPath
, long beginNS
, long endNS
, int numReducers
, T fields
, Configuration conf
, FileSystem fs
, PcapFilterConfigurator<T> filterImpl
) throws IOException
{
Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath));
String inputPaths = Joiner.on(',').join(filteredPaths);
if (StringUtils.isEmpty(inputPaths)) {
return null;
}
conf.set(START_TS_CONF, Long.toUnsignedString(beginNS));
conf.set(END_TS_CONF, Long.toUnsignedString(endNS));
conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers));
filterImpl.addToConfig(fields, conf);
Job job = Job.getInstance(conf);
jobName.ifPresent(job::setJobName);
job.setJarByClass(PcapJob.class);
job.setMapperClass(PcapJob.PcapMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setNumReduceTasks(numReducers);
job.setReducerClass(PcapReducer.class);
job.setPartitionerClass(PcapPartitioner.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(BytesWritable.class);
SequenceFileInputFormat.addInputPaths(job, inputPaths);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, jobOutputPath);
return job;
}
public static long findWidth(long start, long end, int numReducers) {
return Long.divideUnsigned(end - start, numReducers) + 1;
}
protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
List<Path> ret = new ArrayList<>();
RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
while (filesIt.hasNext()) {
ret.add(filesIt.next().getPath());
}
return ret;
}
@Override
public JobType getJobType() {
return JobType.MAP_REDUCE;
}
@Override
public synchronized JobStatus getStatus() throws JobException {
return new JobStatus(jobStatus);
}
protected void setJobStatus(JobStatus jobStatus) {
this.jobStatus = jobStatus;
}
protected void setMrJob(Job mrJob) {
this.mrJob = mrJob;
}
/**
* Synchronous call blocks until completion.
*/
@Override
public Pageable<Path> get() throws JobException, InterruptedException {
if (PcapOptions.PRINT_JOB_STATUS.getOrDefault(configuration, Boolean.class, false) && mrJob != null) {
try {
mrJob.monitorAndPrintJob();
} catch (IOException e) {
throw new JobException("Could not monitor job status", e);
}
}
for (; ; ) {
JobStatus status = getStatus();
if (status.getState() == State.SUCCEEDED
|| status.getState() == State.KILLED
|| status.getState() == State.FAILED) {
return getFinalResults();
} else {
LOG.info("Percent complete: {}, description: {}", status.getPercentComplete(), status.getDescription());
}
Thread.sleep(completeCheckInterval);
}
}
private synchronized Pageable<Path> getFinalResults() {
return new PcapPages(finalResults);
}
@Override
public boolean isDone() {
State jobState = null;
synchronized (this) {
jobState = jobStatus.getState();
}
return (jobState == State.SUCCEEDED
|| jobState == State.KILLED
|| jobState == State.FAILED);
}
@Override
public void kill() throws JobException {
try {
synchronized (this) {
mrJob.killJob();
}
} catch (IOException e) {
throw new JobException("Unable to kill pcap job.", e);
}
}
@Override
public boolean validate(Map<String, Object> configuration) {
// default implementation placeholder
return true;
}
@Override
public Map<String, Object> getConfiguration() {
return new HashMap<>(this.configuration);
}
protected void setConfiguration(Map<String, Object> configuration) {
this.configuration = configuration;
}
}