blob: 87583edaeeae8f7df7100a5e9b6f0ac29470433d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.worker;
import java.io.IOException;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.InputType;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
/**
* Load as many edge input splits as possible.
* Every thread will has its own instance of WorkerClientRequestProcessor
* to send requests.
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
*/
@SuppressWarnings("unchecked")
public class EdgeInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable>
extends InputSplitsCallable<I, V, E> {
/** How often to update metrics and print info */
public static final int EDGES_UPDATE_PERIOD = 1000000;
/** How often to update filtered metrics */
public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
/** Class logger */
private static final Logger LOG = Logger.getLogger(
EdgeInputSplitsCallable.class);
/** Aggregator handler */
private final WorkerThreadGlobalCommUsage globalCommUsage;
/** Bsp service worker (only use thread-safe methods) */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Edge input format */
private final EdgeInputFormat<I, E> edgeInputFormat;
/** Input split max edges (-1 denotes all) */
private final long inputSplitMaxEdges;
/** Can embedInfo in vertexIds */
private final boolean canEmbedInIds;
/** Filter to use */
private final EdgeInputFilter<I, E> edgeInputFilter;
// Metrics
/** edges loaded meter across all readers */
private final Meter totalEdgesMeter;
/** edges filtered out by user */
private final Counter totalEdgesFiltered;
/**
* Constructor.
*
* @param edgeInputFormat Edge input format
* @param context Context
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
*/
public EdgeInputSplitsCallable(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
WorkerInputSplitsHandler splitsHandler) {
super(context, configuration, bspServiceWorker, splitsHandler);
this.edgeInputFormat = edgeInputFormat;
this.bspServiceWorker = bspServiceWorker;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
// Initialize aggregator usage.
this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
.newThreadAggregatorUsage();
edgeInputFilter = configuration.getEdgeInputFilter();
canEmbedInIds = bspServiceWorker
.getLocalData()
.getMappingStoreOps() != null &&
bspServiceWorker
.getLocalData()
.getMappingStoreOps()
.hasEmbedding();
// Initialize Metrics
totalEdgesMeter = getTotalEdgesLoadedMeter();
totalEdgesFiltered = getTotalEdgesFilteredCounter();
}
@Override
public EdgeInputFormat<I, E> getInputFormat() {
return edgeInputFormat;
}
@Override
public InputType getInputType() {
return InputType.EDGE;
}
/**
* Read edges from input split. If testing, the user may request a
* maximum number of edges to be read from an input split.
*
* @param inputSplit Input split to process with edge reader
* @return Edges loaded from this input split
* @throws IOException
* @throws InterruptedException
*/
@Override
protected VertexEdgeCount readInputSplit(
InputSplit inputSplit) throws IOException,
InterruptedException {
EdgeReader<I, E> edgeReader =
edgeInputFormat.createEdgeReader(inputSplit, context);
edgeReader.setConf(
(ImmutableClassesGiraphConfiguration<I, Writable, E>)
configuration);
edgeReader.initialize(inputSplit, context);
// Set aggregator usage to edge reader
edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
long inputSplitEdgesLoaded = 0;
long inputSplitEdgesFiltered = 0;
int count = 0;
OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
while (edgeReader.nextEdge()) {
// If out-of-core mechanism is used, check whether this thread
// can stay active or it should temporarily suspend and stop
// processing and generating more data for the moment.
if (oocEngine != null &&
(++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
oocEngine.activeThreadCheckIn();
}
I sourceId = edgeReader.getCurrentSourceId();
Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
if (sourceId == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a source vertex id! - " + readerEdge);
}
if (readerEdge.getTargetVertexId() == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a target vertex id! - " + readerEdge);
}
if (readerEdge.getValue() == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a value! - " + readerEdge);
}
if (canEmbedInIds) {
bspServiceWorker
.getLocalData()
.getMappingStoreOps()
.embedTargetInfo(sourceId);
bspServiceWorker
.getLocalData()
.getMappingStoreOps()
.embedTargetInfo(readerEdge.getTargetVertexId());
}
++inputSplitEdgesLoaded;
if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
++inputSplitEdgesFiltered;
if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
totalEdgesFiltered.inc(inputSplitEdgesFiltered);
inputSplitEdgesFiltered = 0;
}
continue;
}
workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
// Update status every EDGES_UPDATE_PERIOD edges
if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
"readEdgeInputSplit: Loaded " +
totalEdgesMeter.count() + " edges at " +
totalEdgesMeter.meanRate() + " edges/sec " +
MemoryUtils.getRuntimeMemoryStats());
}
// For sampling, or to limit outlier input splits, the number of
// records per input split can be limited
if (inputSplitMaxEdges > 0 &&
inputSplitEdgesLoaded >= inputSplitMaxEdges) {
if (LOG.isInfoEnabled()) {
LOG.info("readInputSplit: Leaving the input " +
"split early, reached maximum edges " +
inputSplitEdgesLoaded);
}
break;
}
}
edgeReader.close();
totalEdgesFiltered.inc(inputSplitEdgesFiltered);
totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
WorkerProgress.get().addEdgesLoaded(
inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
WorkerProgress.get().incrementEdgeInputSplitsLoaded();
return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
}
}