blob: cce08ab0d003fd2f2e16a9bf09255a50ae405053 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.compute.input;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nonnull;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.EdgeFrequency;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.edge.Edge;
import com.baidu.hugegraph.computer.core.graph.edge.Edges;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.io.BufferedFileInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.StreamGraphInput;
public class EdgesInput {
private RandomAccessInput input;
private final ReusablePointer idPointer;
private final ReusablePointer valuePointer;
private final File edgeFile;
private final GraphFactory graphFactory;
private final int flushThreshold;
private final EdgeFrequency frequency;
public EdgesInput(ComputerContext context, File edgeFile) {
this.graphFactory = context.graphFactory();
this.idPointer = new ReusablePointer();
this.valuePointer = new ReusablePointer();
this.edgeFile = edgeFile;
this.flushThreshold = context.config().get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
this.frequency = context.config().get(ComputerOptions.INPUT_EDGE_FREQ);
}
public void init() throws IOException {
this.input = new BufferedFileInput(this.edgeFile);
}
public void close() throws IOException {
this.input.close();
}
public Edges edges(ReusablePointer vidPointer) {
try {
while (this.input.available() > 0) {
long startPosition = this.input.position();
this.idPointer.read(this.input);
int status = vidPointer.compareTo(this.idPointer);
if (status < 0) { // No edges
/*
* The current batch belong to vertex that vertex id is
* bigger than specified id.
*/
this.input.seek(startPosition);
return EmptyEdges.instance();
} else if (status == 0) { // Has edges
this.valuePointer.read(this.input);
Edges edges = this.readEdges(this.valuePointer.input());
if (edges.size() < this.flushThreshold) {
return edges;
} else {
return new SuperEdges(vidPointer, edges, startPosition);
}
} else {
/*
* The current batch belong to vertex that vertex id is
* smaller than specified id.
*/
int valueLength = this.input.readFixedInt();
this.input.skip(valueLength);
}
}
return EmptyEdges.instance();
} catch (IOException e) {
throw new ComputerException("Can't read from edges input '%s'",
e, this.edgeFile.getAbsoluteFile());
}
}
private class SuperEdges implements Edges {
private final ReusablePointer vid;
private final long startPosition;
private Iterator<Edge> currentEdgesIter;
private int size;
private int iterationTime;
SuperEdges(ReusablePointer vid, Edges edges, long startPosition) {
this.vid = vid;
this.startPosition = startPosition;
this.currentEdgesIter = edges.iterator();
this.size = 0;
this.iterationTime = 0;
}
@Override
public int size() {
if (this.size == 0) {
this.calculateSize();
}
return this.size;
}
private void calculateSize() {
long currentPosition = EdgesInput.this.input.position();
try {
EdgesInput.this.input.seek(this.startPosition);
EdgesInput.this.idPointer.read(EdgesInput.this.input);
while (EdgesInput.this.idPointer.compareTo(this.vid) == 0) {
long valueLength = EdgesInput.this.input.readFixedInt();
this.size += EdgesInput.this.input.readInt();
long bytesToSkip = valueLength - Constants.INT_LEN;
EdgesInput.this.input.skip(bytesToSkip);
if (EdgesInput.this.input.available() > 0) {
EdgesInput.this.idPointer.read(EdgesInput.this.input);
} else {
break;
}
}
EdgesInput.this.input.seek(currentPosition);
} catch (IOException e) {
throw new ComputerException("Compute size of edges failed", e);
}
}
@Override
public void add(Edge edge) {
throw new ComputerException(
"Not support adding edges during computing");
}
@Override
@Nonnull
public Iterator<Edge> iterator() {
try {
if (this.iterationTime != 0) {
EdgesInput.this.input.seek(this.startPosition);
}
this.iterationTime++;
} catch (IOException e) {
throw new ComputerException("Can't seek to %s",
e, this.startPosition);
}
return new EdgesIterator();
}
private class EdgesIterator implements Iterator<Edge> {
@Override
public boolean hasNext() {
if (currentEdgesIter.hasNext()) {
return true;
}
long currentPosition = input.position();
try {
if (input.available() > 0) {
idPointer.read(input);
if (idPointer.compareTo(vid) == 0) {
valuePointer.read(input);
currentEdgesIter = readEdges(valuePointer.input())
.iterator();
} else {
input.seek(currentPosition);
}
}
} catch (IOException e) {
throw new ComputerException(
"Error occurred when read edges from edges " +
"input '%s' at position %s", e,
edgeFile.getAbsoluteFile(), currentPosition);
}
return currentEdgesIter.hasNext();
}
@Override
public Edge next() {
return currentEdgesIter.next();
}
}
}
// TODO: use one reused Edges instance to read batches for each vertex.
private Edges readEdges(RandomAccessInput in) {
try {
int count = in.readFixedInt();
Edges edges = this.graphFactory.createEdges(count);
if (this.frequency == EdgeFrequency.SINGLE) {
for (int i = 0; i < count; i++) {
Edge edge = this.graphFactory.createEdge();
// Only use targetId as subKey, use props as subValue
edge.targetId(StreamGraphInput.readId(in));
// Read subValue
Properties props = this.graphFactory.createProperties();
props.read(in);
edge.properties(props);
edges.add(edge);
}
} else if (this.frequency == EdgeFrequency.SINGLE_PER_LABEL) {
for (int i = 0; i < count; i++) {
Edge edge = this.graphFactory.createEdge();
// Use label + targetId as subKey, use props as subValue
edge.label(StreamGraphInput.readLabel(in));
edge.targetId(StreamGraphInput.readId(in));
// Read subValue
Properties props = this.graphFactory.createProperties();
props.read(in);
edge.properties(props);
edges.add(edge);
}
} else {
assert this.frequency == EdgeFrequency.MULTIPLE;
for (int i = 0; i < count; i++) {
Edge edge = this.graphFactory.createEdge();
/*
* Use label + sortValues + targetId as subKey,
* use properties as subValue
*/
edge.label(StreamGraphInput.readLabel(in));
edge.name(StreamGraphInput.readLabel(in));
edge.targetId(StreamGraphInput.readId(in));
// Read subValue
Properties props = this.graphFactory.createProperties();
props.read(in);
edge.properties(props);
edges.add(edge);
}
}
return edges;
} catch (IOException e) {
throw new ComputerException("Failed to read edges from input '%s'",
e, this.edgeFile.getAbsoluteFile());
}
}
public static class EmptyEdges implements Edges {
private static final EmptyEdges INSTANCE = new EmptyEdges();
private EmptyEdges() {
// pass
}
public static EmptyEdges instance() {
return INSTANCE;
}
@Override
public int size() {
return 0;
}
@Override
public void add(Edge edge) {
throw new ComputerException(
"Not support adding edges during computing");
}
@Override
public Iterator<Edge> iterator() {
return Collections.emptyIterator();
}
}
}