blob: 2cea9cb2c7a30a29f759a8a510f09d43a19a9762 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework.block;
import java.util.Iterator;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.vertex.SupplierFromVertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
/**
* Block which filters out calls to vertexSend/vertexReceive functions
* of all pieces in a given block.
* Filtering happens based on toCallSend and toCallReceive suppliers
* that are passed in, as every piece is just wrapped with FilteringPiece.
*
* @param <I> Vertex id type
* @param <V> Vertex value type
* @param <E> Edge value type
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public final class FilteringBlock<I extends WritableComparable,
V extends Writable, E extends Writable>
implements Block {
private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
private final Block block;
/**
* Creates filtering block which uses passed {@code toCallSend} to filter
* calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
* calls to {@code vertexReceive}, on all pieces within passed {@code block}.
*/
public FilteringBlock(
SupplierFromVertex<I, V, E, Boolean> toCallSend,
SupplierFromVertex<I, V, E, Boolean> toCallReceive,
Block block) {
this.toCallSend = toCallSend;
this.toCallReceive = toCallReceive;
this.block = block;
}
/**
* Creates filtering block, where both vertexSend and vertexReceive is
* filtered based on same supplier.
*/
public FilteringBlock(
SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) {
this(toCallSendAndReceive, toCallSendAndReceive, block);
}
/**
* Creates filtering block, that filters only vertexReceive function,
* and always calls vertexSend function.
*/
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
Block createReceiveFiltering(
SupplierFromVertex<I, V, E, Boolean> toCallReceive,
Block innerBlock) {
return new FilteringBlock<>(null, toCallReceive, innerBlock);
}
/**
* Creates filtering block, that filters only vertexSend function,
* and always calls vertexReceive function.
*/
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
Block createSendFiltering(
SupplierFromVertex<I, V, E, Boolean> toCallSend,
Block innerBlock) {
return new FilteringBlock<>(toCallSend, null, innerBlock);
}
@Override
public Iterator<AbstractPiece> iterator() {
return Iterators.transform(
block.iterator(),
new Function<AbstractPiece, AbstractPiece>() {
@Override
public AbstractPiece apply(AbstractPiece input) {
return new FilteringPiece<>(toCallSend, toCallReceive, input);
}
});
}
@Override
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
block.forAllPossiblePieces(consumer);
}
@Override
public PieceCount getPieceCount() {
return block.getPieceCount();
}
}