blob: 44b43ec5d5a6a31daa74ef0b0e7ded82f826ebb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.worker;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.InputType;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.util.PercentGauge;
/**
* Abstract base class for loading vertex/edge input splits.
* Every thread will has its own instance of WorkerClientRequestProcessor
* to send requests.
*
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
*/
public abstract class InputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable>
implements Callable<VertexEdgeCount> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
/** Class time object */
private static final Time TIME = SystemTime.get();
/** Configuration */
protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Context */
protected final Mapper<?, ?, ?, ?>.Context context;
/** Handles IPC communication */
protected final WorkerClientRequestProcessor<I, V, E>
workerClientRequestProcessor;
/**
* Stores and processes the list of InputSplits advertised
* in a tree of child znodes by the master.
*/
private final WorkerInputSplitsHandler splitsHandler;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
/** Whether to prioritize local input splits. */
private final boolean useLocality;
/** Service worker */
private final CentralizedServiceWorker<I, V, E> serviceWorker;
/**
* Constructor.
*
* @param context Context
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
*/
public InputSplitsCallable(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
WorkerInputSplitsHandler splitsHandler) {
this.context = context;
this.workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
context, configuration, bspServiceWorker,
false /* useOneMessageToManyIdsEncoding, not useful for input */);
this.useLocality = configuration.useInputSplitLocality();
this.splitsHandler = splitsHandler;
this.configuration = configuration;
this.serviceWorker = bspServiceWorker;
}
/**
* Get input format
*
* @return Input format
*/
public abstract GiraphInputFormat getInputFormat();
/**
* Get input type
*
* @return Input type
*/
public abstract InputType getInputType();
/**
* Get Meter tracking edges loaded
*
* @return Meter tracking edges loaded
*/
public static Meter getTotalEdgesLoadedMeter() {
return GiraphMetrics.get().perJobRequired()
.getMeter(MeterDesc.EDGES_LOADED);
}
/**
* Get Counter tracking edges filtered
*
* @return Counter tracking edges filtered
*/
public static Counter getTotalEdgesFilteredCounter() {
return GiraphMetrics.get().perJobRequired()
.getCounter(MetricNames.EDGES_FILTERED);
}
/**
* Get Meter tracking number of vertices loaded.
*
* @return Meter for vertices loaded
*/
public static Meter getTotalVerticesLoadedMeter() {
return GiraphMetrics.get().perJobRequired()
.getMeter(MeterDesc.VERTICES_LOADED);
}
/**
* Get Counter tracking vertices filtered
*
* @return Counter tracking vertices filtered
*/
public static Counter getTotalVerticesFilteredCounter() {
return GiraphMetrics.get().perJobRequired()
.getCounter(MetricNames.VERTICES_FILTERED);
}
/**
* Initialize metrics used by this class and its subclasses.
*/
public static void initMetrics() {
GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
final Counter edgesFiltered = getTotalEdgesFilteredCounter();
final Meter edgesLoaded = getTotalEdgesLoadedMeter();
metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
@Override protected double getNumerator() {
return edgesFiltered.count();
}
@Override protected double getDenominator() {
return edgesLoaded.count();
}
});
final Counter verticesFiltered = getTotalVerticesFilteredCounter();
final Meter verticesLoaded = getTotalVerticesLoadedMeter();
metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
@Override protected double getNumerator() {
return verticesFiltered.count();
}
@Override protected double getDenominator() {
return verticesLoaded.count();
}
});
}
/**
* Load vertices/edges from the given input split.
*
* @param inputSplit Input split to load
* @return Count of vertices and edges loaded
* @throws IOException
* @throws InterruptedException
*/
protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
throws IOException, InterruptedException;
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
int inputSplitsProcessed = 0;
try {
OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
if (oocEngine != null) {
oocEngine.processingThreadStart();
}
while (true) {
byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
getInputType(), inputSplitsProcessed == 0);
if (serializedInputSplit == null) {
// No splits left
break;
}
// If out-of-core mechanism is used, check whether this thread
// can stay active or it should temporarily suspend and stop
// processing and generating more data for the moment.
if (oocEngine != null) {
oocEngine.activeThreadCheckIn();
}
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
loadInputSplit(serializedInputSplit));
context.progress();
++inputSplitsProcessed;
}
if (oocEngine != null) {
oocEngine.processingThreadFinish();
}
} catch (InterruptedException e) {
throw new IllegalStateException("call: InterruptedException", e);
} catch (IOException e) {
throw new IllegalStateException("call: IOException", e);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("call: ClassNotFoundException", e);
}
if (LOG.isInfoEnabled()) {
float seconds = Times.getNanosSince(TIME, startNanos) /
Time.NS_PER_SECOND_AS_FLOAT;
float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
LOG.info("call: Loaded " + inputSplitsProcessed + " " +
"input splits in " + seconds + " secs, " + vertexEdgeCount +
" " + verticesPerSecond + " vertices/sec, " +
edgesPerSecond + " edges/sec");
}
try {
workerClientRequestProcessor.flush();
} catch (IOException e) {
throw new IllegalStateException("call: Flushing failed.", e);
}
return vertexEdgeCount;
}
/**
* Extract vertices from input split, saving them into a mini cache of
* partitions. Periodically flush the cache of vertices when a limit is
* reached in readVerticeFromInputSplit.
* Mark the input split finished when done.
*
* @param serializedInputSplit Serialized input split
* @return Mapping of vertex indices and statistics, or null if no data read
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
throws IOException, ClassNotFoundException, InterruptedException {
InputSplit inputSplit = getInputSplit(serializedInputSplit);
VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
if (LOG.isInfoEnabled()) {
LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
}
return vertexEdgeCount;
}
/**
* Talk to ZooKeeper to convert the input split path to the actual
* InputSplit.
*
* @param serializedInputSplit Serialized input split
* @return instance of InputSplit
* @throws IOException
* @throws ClassNotFoundException
*/
protected InputSplit getInputSplit(byte[] serializedInputSplit)
throws IOException, ClassNotFoundException {
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
if (LOG.isInfoEnabled()) {
LOG.info("getInputSplit: Reserved input split '" +
inputSplit.toString() + "'");
}
return inputSplit;
}
}