blob: 6dcf766cce397931d095a8018e221cbb3028511e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Methods for translation of the type or modification of the data of graph
* IDs, vertex values, and edge values.
*/
public class Translate {
// --------------------------------------------------------------------------------------------
// Translate vertex IDs
// --------------------------------------------------------------------------------------------
/**
* Translate {@link Vertex} IDs using the given {@link TranslateFunction}.
*
* @param vertices input vertices
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param <OLD> old vertex ID type
* @param <NEW> new vertex ID type
* @param <VV> vertex value type
* @return translated vertices
*/
public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, TranslateFunction<OLD, NEW> translator) {
return translateVertexIds(vertices, translator, PARALLELISM_DEFAULT);
}
/**
* Translate {@link Vertex} IDs using the given {@link TranslateFunction}.
*
* @param vertices input vertices
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param parallelism operator parallelism
* @param <OLD> old vertex ID type
* @param <NEW> new vertex ID type
* @param <VV> vertex value type
* @return translated vertices
*/
@SuppressWarnings("unchecked")
public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, TranslateFunction<OLD, NEW> translator, int parallelism) {
Preconditions.checkNotNull(vertices);
Preconditions.checkNotNull(translator);
Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) (Class<? extends Vertex>) Vertex.class;
TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0);
TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
translator,
TranslateFunction.class,
0,
1,
new int[]{0},
new int[]{1},
oldType,
null,
false);
TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
return vertices
.map(new TranslateVertexId<>(translator))
.returns(returnType)
.setParallelism(parallelism)
.name("Translate vertex IDs");
}
/**
* Translate {@link Vertex} IDs using the given {@link TranslateFunction}.
*
* @param <OLD> old vertex ID type
* @param <NEW> new vertex ID type
* @param <VV> vertex value type
*/
@ForwardedFields("1")
private static class TranslateVertexId<OLD, NEW, VV>
extends WrappingFunction<TranslateFunction<OLD, NEW>>
implements MapFunction<Vertex<OLD, VV>, Vertex<NEW, VV>> {
private Vertex<NEW, VV> vertex = new Vertex<>();
public TranslateVertexId(TranslateFunction<OLD, NEW> translator) {
super(translator);
}
@Override
public Vertex<NEW, VV> map(Vertex<OLD, VV> value)
throws Exception {
vertex.f0 = wrappedFunction.translate(value.f0, vertex.f0);
vertex.f1 = value.f1;
return vertex;
}
}
// --------------------------------------------------------------------------------------------
// Translate edge IDs
// --------------------------------------------------------------------------------------------
/**
* Translate {@link Edge} IDs using the given {@link TranslateFunction}.
*
* @param edges input edges
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param <OLD> old edge ID type
* @param <NEW> new edge ID type
* @param <EV> edge value type
* @return translated edges
*/
public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, TranslateFunction<OLD, NEW> translator) {
return translateEdgeIds(edges, translator, PARALLELISM_DEFAULT);
}
/**
* Translate {@link Edge} IDs using the given {@link TranslateFunction}.
*
* @param edges input edges
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param parallelism operator parallelism
* @param <OLD> old edge ID type
* @param <NEW> new edge ID type
* @param <EV> edge value type
* @return translated edges
*/
@SuppressWarnings("unchecked")
public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, TranslateFunction<OLD, NEW> translator, int parallelism) {
Preconditions.checkNotNull(edges);
Preconditions.checkNotNull(translator);
Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) (Class<? extends Edge>) Edge.class;
TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0);
TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
translator,
TranslateFunction.class,
0,
1,
new int[] {0},
new int[] {1},
oldType,
null,
false);
TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2);
TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
return edges
.map(new TranslateEdgeId<>(translator))
.returns(returnType)
.setParallelism(parallelism)
.name("Translate edge IDs");
}
/**
* Translate {@link Edge} IDs using the given {@link TranslateFunction}.
*
* @param <OLD> old edge ID type
* @param <NEW> new edge ID type
* @param <EV> edge value type
*/
@ForwardedFields("2")
private static class TranslateEdgeId<OLD, NEW, EV>
extends WrappingFunction<TranslateFunction<OLD, NEW>>
implements MapFunction<Edge<OLD, EV>, Edge<NEW, EV>> {
private Edge<NEW, EV> edge = new Edge<>();
public TranslateEdgeId(TranslateFunction<OLD, NEW> translator) {
super(translator);
}
@Override
public Edge<NEW, EV> map(Edge<OLD, EV> value)
throws Exception {
edge.f0 = wrappedFunction.translate(value.f0, edge.f0);
edge.f1 = wrappedFunction.translate(value.f1, edge.f1);
edge.f2 = value.f2;
return edge;
}
}
// --------------------------------------------------------------------------------------------
// Translate vertex values
// --------------------------------------------------------------------------------------------
/**
* Translate {@link Vertex} values using the given {@link TranslateFunction}.
*
* @param vertices input vertices
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param <K> vertex ID type
* @param <OLD> old vertex value type
* @param <NEW> new vertex value type
* @return translated vertices
*/
public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, TranslateFunction<OLD, NEW> translator) {
return translateVertexValues(vertices, translator, PARALLELISM_DEFAULT);
}
/**
* Translate {@link Vertex} values using the given {@link TranslateFunction}.
*
* @param vertices input vertices
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param parallelism operator parallelism
* @param <K> vertex ID type
* @param <OLD> old vertex value type
* @param <NEW> new vertex value type
* @return translated vertices
*/
@SuppressWarnings("unchecked")
public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, TranslateFunction<OLD, NEW> translator, int parallelism) {
Preconditions.checkNotNull(vertices);
Preconditions.checkNotNull(translator);
Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) (Class<? extends Vertex>) Vertex.class;
TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(1);
TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
translator,
TranslateFunction.class,
0,
1,
new int[]{0},
new int[]{1},
oldType,
null,
false);
TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType);
return vertices
.map(new TranslateVertexValue<>(translator))
.returns(returnType)
.setParallelism(parallelism)
.name("Translate vertex values");
}
/**
* Translate {@link Vertex} values using the given {@link TranslateFunction}.
*
* @param <K> vertex ID type
* @param <OLD> old vertex value type
* @param <NEW> new vertex value type
*/
@ForwardedFields("0")
private static class TranslateVertexValue<K, OLD, NEW>
extends WrappingFunction<TranslateFunction<OLD, NEW>>
implements MapFunction<Vertex<K, OLD>, Vertex<K, NEW>> {
private Vertex<K, NEW> vertex = new Vertex<>();
public TranslateVertexValue(TranslateFunction<OLD, NEW> translator) {
super(translator);
}
@Override
public Vertex<K, NEW> map(Vertex<K, OLD> value)
throws Exception {
vertex.f0 = value.f0;
vertex.f1 = wrappedFunction.translate(value.f1, vertex.f1);
return vertex;
}
}
// --------------------------------------------------------------------------------------------
// Translate edge values
// --------------------------------------------------------------------------------------------
/**
* Translate {@link Edge} values using the given {@link TranslateFunction}.
*
* @param edges input edges
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param <K> edge ID type
* @param <OLD> old edge value type
* @param <NEW> new edge value type
* @return translated edges
*/
public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, TranslateFunction<OLD, NEW> translator) {
return translateEdgeValues(edges, translator, PARALLELISM_DEFAULT);
}
/**
* Translate {@link Edge} values using the given {@link TranslateFunction}.
*
* @param edges input edges
* @param translator implements conversion from {@code OLD} to {@code NEW}
* @param parallelism operator parallelism
* @param <K> vertex ID type
* @param <OLD> old edge value type
* @param <NEW> new edge value type
* @return translated edges
*/
@SuppressWarnings("unchecked")
public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, TranslateFunction<OLD, NEW> translator, int parallelism) {
Preconditions.checkNotNull(edges);
Preconditions.checkNotNull(translator);
Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? extends Edge>) Edge.class;
TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);
TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(2);
TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
translator,
TranslateFunction.class,
0,
1,
new int[]{0},
new int[]{1},
oldType,
null,
false);
TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType);
return edges
.map(new TranslateEdgeValue<>(translator))
.returns(returnType)
.setParallelism(parallelism)
.name("Translate edge values");
}
/**
* Translate {@link Edge} values using the given {@link TranslateFunction}.
*
* @param <K> edge ID type
* @param <OLD> old edge value type
* @param <NEW> new edge value type
*/
@ForwardedFields("0; 1")
private static class TranslateEdgeValue<K, OLD, NEW>
extends WrappingFunction<TranslateFunction<OLD, NEW>>
implements MapFunction<Edge<K, OLD>, Edge<K, NEW>> {
private Edge<K, NEW> edge = new Edge<>();
public TranslateEdgeValue(TranslateFunction<OLD, NEW> translator) {
super(translator);
}
@Override
public Edge<K, NEW> map(Edge<K, OLD> value)
throws Exception {
edge.f0 = value.f0;
edge.f1 = value.f1;
edge.f2 = wrappedFunction.translate(value.f2, edge.f2);
return edge;
}
}
}