| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.examples.beam; |
| |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.Filter; |
| import org.apache.beam.sdk.transforms.GroupByKey; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.SimpleFunction; |
| import org.apache.beam.sdk.transforms.join.CoGbkResult; |
| import org.apache.beam.sdk.transforms.join.CoGroupByKey; |
| import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| /** |
| * An app that analyzes data flow from network trace. |
| * Each line in the output file represents a host, containing the standard deviation of the lengths of packets |
| * that flows into the host (reads input0 file), and the standard deviation of the lengths of packets |
| * that flows out from the host (reads input1 file). |
| */ |
| public final class NetworkTraceAnalysis { |
| /** |
| * Private constructor. |
| */ |
| private NetworkTraceAnalysis() { |
| } |
| |
| /** |
| * Main function for the Beam program. |
| * |
| * @param args arguments. |
| */ |
| public static void main(final String[] args) { |
| final String input0FilePath = args[0]; |
| final String input1FilePath = args[1]; |
| final String outputFilePath = args[2]; |
| final PipelineOptions options = NemoPipelineOptionsFactory.create(); |
| options.setJobName("NetworkTraceAnalysis"); |
| |
| // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29" |
| final Pattern pattern = Pattern.compile(" *\\d+ +[0-9.]+ +([0-9.]+) -> ([0-9.]+) +.*Len=(\\d+)"); |
| |
| final SimpleFunction<String, Boolean> filter = new SimpleFunction<String, Boolean>() { |
| @Override |
| public Boolean apply(final String line) { |
| return pattern.matcher(line).find(); |
| } |
| }; |
| final SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>> mapToStdev |
| = new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>>() { |
| @Override |
| public KV<String, Long> apply(final KV<String, Iterable<KV<String, Long>>> kv) { |
| return KV.of(kv.getKey(), stdev(kv.getValue())); |
| } |
| }; |
| |
| final Pipeline p = Pipeline.create(options); |
| final PCollection<KV<String, Long>> in0 = GenericSourceSink.read(p, input0FilePath) |
| .apply(Filter.by(filter)) |
| .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() { |
| @Override |
| public KV<String, KV<String, Long>> apply(final String line) { |
| final Matcher matcher = pattern.matcher(line); |
| matcher.find(); |
| return KV.of(matcher.group(2), KV.of(matcher.group(1), Long.valueOf(matcher.group(3)))); |
| } |
| })) |
| .apply(GroupByKey.create()) |
| .apply(MapElements.via(mapToStdev)); |
| final PCollection<KV<String, Long>> in1 = GenericSourceSink.read(p, input1FilePath) |
| .apply(Filter.by(filter)) |
| .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() { |
| @Override |
| public KV<String, KV<String, Long>> apply(final String line) { |
| final Matcher matcher = pattern.matcher(line); |
| matcher.find(); |
| return KV.of(matcher.group(1), KV.of(matcher.group(2), Long.valueOf(matcher.group(3)))); |
| } |
| })) |
| .apply(GroupByKey.create()) |
| .apply(MapElements.via(mapToStdev)); |
| final TupleTag<Long> tag0 = new TupleTag<>(); |
| final TupleTag<Long> tag1 = new TupleTag<>(); |
| final PCollection<KV<String, CoGbkResult>> joined = |
| KeyedPCollectionTuple.of(tag0, in0).and(tag1, in1).apply(CoGroupByKey.create()); |
| final PCollection<String> result = joined |
| .apply(MapElements.via(new SimpleFunction<KV<String, CoGbkResult>, String>() { |
| @Override |
| public String apply(final KV<String, CoGbkResult> kv) { |
| final long source = getLong(kv.getValue().getAll(tag0)); |
| final long destination = getLong(kv.getValue().getAll(tag1)); |
| final String intermediate = kv.getKey(); |
| return new StringBuilder(intermediate).append(",").append(source).append(",") |
| .append(destination).toString(); |
| } |
| })); |
| GenericSourceSink.write(result, outputFilePath); |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * @param data data |
| * @return extracted long typed data |
| */ |
| private static long getLong(final Iterable<Long> data) { |
| final Iterator<Long> iterator = data.iterator(); |
| return iterator.hasNext() ? iterator.next() : 0; |
| } |
| |
| /** |
| * @param data list of data |
| * @return standard deviation of data. |
| */ |
| private static long stdev(final Iterable<KV<String, Long>> data) { |
| final StandardDeviation stdev = new StandardDeviation(); |
| final List<Long> elements = new ArrayList<>(); |
| for (final KV<String, Long> e : data) { |
| elements.add(e.getValue()); |
| } |
| return Math.round(stdev.evaluate(elements.stream().mapToDouble(e -> e).toArray())); |
| } |
| } |