| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.test.iterative; |
| |
| import org.apache.flink.api.common.functions.RichCoGroupFunction; |
| import org.apache.flink.api.common.functions.RichFlatMapFunction; |
| import org.apache.flink.api.common.functions.RichMapFunction; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.ExecutionEnvironment; |
| import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; |
| import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; |
| import org.apache.flink.api.java.io.LocalCollectionOutputFormat; |
| import org.apache.flink.api.java.operators.DeltaIteration; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.examples.java.graph.ConnectedComponents.DuplicateValue; |
| import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin; |
| import org.apache.flink.test.testdata.ConnectedComponentsData; |
| import org.apache.flink.test.util.JavaProgramTestBase; |
| import org.apache.flink.util.Collector; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| /** Delta iteration test implementing the connected components algorithm with a cogroup. */ |
| @SuppressWarnings("serial") |
| public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase { |
| |
| private static final long SEED = 0xBADC0FFEEBEEFL; |
| |
| private static final int NUM_VERTICES = 1000; |
| |
| private static final int NUM_EDGES = 10000; |
| |
| @Override |
| protected void testProgram() throws Exception { |
| |
| // set up execution environment |
| ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
| |
| // read vertex and edge data |
| DataSet<Long> vertices = |
| env.fromElements( |
| ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES) |
| .split("\n")) |
| .map(new VertexParser()); |
| |
| DataSet<Tuple2<Long, Long>> edges = |
| env.fromElements( |
| ConnectedComponentsData.getRandomOddEvenEdges( |
| NUM_EDGES, NUM_VERTICES, SEED) |
| .split("\n")) |
| .flatMap(new EdgeParser()); |
| |
| // assign the initial components (equal to the vertex id) |
| DataSet<Tuple2<Long, Long>> verticesWithInitialId = |
| vertices.map(new DuplicateValue<Long>()); |
| |
| // open a delta iteration |
| DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = |
| verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0); |
| |
| // apply the step logic: join with the edges, select the minimum neighbor, update if the |
| // component of the candidate is smaller |
| DataSet<Tuple2<Long, Long>> changes = |
| iteration |
| .getWorkset() |
| .join(edges) |
| .where(0) |
| .equalTo(0) |
| .with(new NeighborWithComponentIDJoin()) |
| .coGroup(iteration.getSolutionSet()) |
| .where(0) |
| .equalTo(0) |
| .with(new MinIdAndUpdate()); |
| |
| // close the delta iteration (delta and new workset are identical) |
| DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes); |
| |
| // emit result |
| List<Tuple2<Long, Long>> resutTuples = new ArrayList<>(); |
| result.output(new LocalCollectionOutputFormat<>(resutTuples)); |
| |
| env.execute(); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // The test program |
| // -------------------------------------------------------------------------------------------- |
| |
| private static final class VertexParser extends RichMapFunction<String, Long> { |
| |
| @Override |
| public Long map(String value) throws Exception { |
| return Long.parseLong(value); |
| } |
| } |
| |
| private static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> { |
| |
| @Override |
| public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception { |
| String[] parts = value.split(" "); |
| long v1 = Long.parseLong(parts[0]); |
| long v2 = Long.parseLong(parts[1]); |
| |
| out.collect(new Tuple2<Long, Long>(v1, v2)); |
| out.collect(new Tuple2<Long, Long>(v2, v1)); |
| } |
| } |
| |
| @ForwardedFieldsFirst("0") |
| @ForwardedFieldsSecond("0") |
| private static final class MinIdAndUpdate |
| extends RichCoGroupFunction< |
| Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { |
| |
| @Override |
| public void coGroup( |
| Iterable<Tuple2<Long, Long>> candidates, |
| Iterable<Tuple2<Long, Long>> current, |
| Collector<Tuple2<Long, Long>> out) { |
| Iterator<Tuple2<Long, Long>> iterator = current.iterator(); |
| if (!iterator.hasNext()) { |
| throw new RuntimeException("Error: Id not encountered before."); |
| } |
| |
| Tuple2<Long, Long> old = iterator.next(); |
| |
| long minimumComponentID = Long.MAX_VALUE; |
| |
| for (Tuple2<Long, Long> candidate : candidates) { |
| long candidateComponentID = candidate.f1; |
| if (candidateComponentID < minimumComponentID) { |
| minimumComponentID = candidateComponentID; |
| } |
| } |
| |
| if (minimumComponentID < old.f1) { |
| old.f1 = minimumComponentID; |
| out.collect(old); |
| } |
| } |
| } |
| } |