blob: 57810eacc73a203ad89a322f4dec77f3d95b327d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.edge;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.EdgeIterables;
import org.apache.giraph.utils.Trimmable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
/**
* Implementation of {@link org.apache.giraph.edge.OutEdges} with long ids
* and null edge values, backed by a dynamic primitive array.
* Parallel edges are allowed.
* Note: this implementation is optimized for space usage,
* but random access and edge removals are expensive.
* Users of this class should explicitly call {@link #trim()} function
* to compact in-memory representation after all updates are done.
* Compacting object is expensive so should only be done once after bulk update.
* Compaction can also be caused by serialization attempt or
* by calling {@link #iterator()}
*/
@NotThreadSafe
public class LongDiffNullArrayEdges
extends ConfigurableOutEdges<LongWritable, NullWritable>
implements ReuseObjectsOutEdges<LongWritable, NullWritable>,
MutableOutEdges<LongWritable, NullWritable>, Trimmable {
/**
* Compressed array of target vertex ids.
*/
private LongDiffArray edges = new LongDiffArray();
@Override
public void setConf(ImmutableClassesGiraphConfiguration
<LongWritable, Writable, NullWritable> conf) {
super.setConf(conf);
edges.setUseUnsafeSerialization(conf.getUseUnsafeSerialization());
}
@Override
public void initialize(
Iterable<Edge<LongWritable, NullWritable>> edgeIterator
) {
edges.initialize();
EdgeIterables.initialize(this, edgeIterator);
edges.trim();
}
@Override
public void initialize(int capacity) {
edges.initialize(capacity);
}
@Override
public void initialize() {
edges.initialize();
}
@Override
public void add(Edge<LongWritable, NullWritable> edge) {
edges.add(edge.getTargetVertexId().get());
}
@Override
public void remove(LongWritable targetVertexId) {
edges.remove(targetVertexId.get());
}
@Override
public int size() {
return edges.size();
}
@Override
public Iterator<Edge<LongWritable, NullWritable>> iterator() {
// Returns an iterator that reuses objects.
// The downcast is fine because all concrete Edge implementations are
// mutable, but we only expose the mutation functionality when appropriate.
return (Iterator) mutableIterator();
}
@Override
public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
trim();
return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
private final Iterator<LongWritable> reader = edges.iterator();
/** Representative edge object. */
private final MutableEdge<LongWritable, NullWritable> representativeEdge =
EdgeFactory.createReusable(new LongWritable());
@Override
public boolean hasNext() {
return reader.hasNext();
}
@Override
public MutableEdge<LongWritable, NullWritable> next() {
representativeEdge.getTargetVertexId().set(reader.next().get());
return representativeEdge;
}
@Override
public void remove() {
reader.remove();
}
};
}
@Override
public void write(DataOutput out) throws IOException {
edges.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
edges.readFields(in);
}
/**
* This function takes all recent updates and stores them efficiently.
* It is safe to call this function multiple times.
*/
@Override
public void trim() {
edges.trim();
}
}