blob: 40f992e3f756c5ec6bee05372414f82023ba14e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.rexster.io.formats;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_E_SCRIPT;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_V_SCRIPT;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_INPUT_GRAPH;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_GRAPH;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_PORT;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.tinkerpop.rexster.Tokens;
import com.tinkerpop.rexster.protocol.EngineController;
import com.tinkerpop.rexster.server.HttpRexsterServer;
import com.tinkerpop.rexster.server.RexsterApplication;
import com.tinkerpop.rexster.server.RexsterServer;
import com.tinkerpop.rexster.server.XmlRexsterApplication;
/**
* This test suit is intended to extensively test Rexster I/O Format
* together with the Kibble for such a goal.
*
*
* Note: this is a very simple test case: load data into rexster
* and then read it using giraph. And reverse: load data using giraph
* and then read it using rexster. The graph that is being loaded
* is always the same, but the output we receive is actually different
* for different underlying formats. Why? Probably because of some bugs.
*/
public class TestRexsterLongDoubleFloatIOFormat {
/** temporary directory */
protected static final String TMP_DIR = "/tmp/";
/** input JSON extension */
protected static final String REXSTER_CONF = "rexster.xml";
/** string databases */
protected static final String DATABASES[] = { "tgdb", "neodb", "orientdb" };
/** string database (empty one) */
protected static final String EMPTYDB = "emptydb";
/** Rexster server instance */
protected static RexsterServer server;
@BeforeClass
public static void initialSetup() throws Exception {
//In case there were previous runs that failed
deleteDbs();
startRexsterServer();
insertDbData();
}
@AfterClass
static public void finalTearDown() throws Exception {
stopRexsterServer();
deleteDbs();
}
@Test
public void testEmptyDbInput() throws Exception {
testDbInput(EMPTYDB, true, false);
}
@Ignore("Fails due to maven dependecy conflicts.")
@Test
public void testEmptyDbInputGremlin() throws Exception {
testDbInput(EMPTYDB, true, true);
}
@Ignore
@Test
public void testTgDbInput() throws Exception {
testDbInput(DATABASES[0], false, false);
}
@Ignore("Fails due to maven dependecy conflicts.")
@Test
public void testTgDbInputGremlin() throws Exception {
testDbInput(DATABASES[0], false, true);
}
@Test
public void testNeoDbInput() throws Exception {
testDbInput(DATABASES[1], false, false);
}
@Ignore("Fails due to maven dependecy conflicts.")
@Test
public void testNeoDbInputGremlin() throws Exception {
testDbInput(DATABASES[1], false, true);
}
@Test
public void testOrientDbInput() throws Exception {
testDbInput(DATABASES[2], false, false);
}
@Ignore("Fails due to maven dependecy conflicts.")
@Test
public void testOrientDbInputGremlin() throws Exception {
testDbInput(DATABASES[2], false, true);
}
@Ignore
@Test
public void testTgDbOutput() throws Exception {
testDbOutput("empty" + DATABASES[0]);
}
@Ignore
@Test
public void testNeoDbOutput() throws Exception {
testDbOutput("empty" + DATABASES[1]);
}
@Ignore
@Test
public void testOrientDbOutput() throws Exception {
testDbOutput("empty" + DATABASES[2]);
}
private void testDbInput(String name, boolean isEmpty, boolean isGramlin)
throws Exception {
GiraphConfiguration conf = new GiraphConfiguration();
GIRAPH_REXSTER_HOSTNAME.set(conf, "127.0.0.1");
GIRAPH_REXSTER_PORT.set(conf, 18182);
GIRAPH_REXSTER_INPUT_GRAPH.set(conf, name);
if (isGramlin) {
GIRAPH_REXSTER_GREMLIN_V_SCRIPT.set(conf, "g.V");
GIRAPH_REXSTER_GREMLIN_E_SCRIPT.set(conf, "g.E");
}
conf.setComputationClass(EmptyComputation.class);
conf.setVertexInputFormatClass(
RexsterLongDoubleFloatVertexInputFormat.class);
conf.setEdgeInputFormatClass(RexsterLongFloatEdgeInputFormat.class);
conf.setVertexOutputFormatClass(
JsonLongDoubleFloatDoubleVertexOutputFormat.class);
Iterable<String> results = InternalVertexRunner.run(conf, new String[0],
new String[0]);
if (isEmpty) {
boolean empty = false;
if (results != null) {
Iterator<String> it = results.iterator();
empty = !it.hasNext();
} else {
empty = true;
}
Assert.assertTrue(empty);
return;
} else {
Assert.assertNotNull(results);
}
URL url = this.getClass().getResource(name + "-output.json");
File file = new File(url.toURI());
ArrayList<Element> expected =
convertIterator(Files.readLines(file,Charsets.UTF_8).iterator());
ArrayList<Element> result = convertIterator(results.iterator());
checkResult(expected, result);
}
private void testDbOutput(String name) throws Exception {
GiraphConfiguration conf = new GiraphConfiguration();
GIRAPH_REXSTER_HOSTNAME.set(conf, "127.0.0.1");
GIRAPH_REXSTER_PORT.set(conf, 18182);
GIRAPH_REXSTER_OUTPUT_GRAPH.set(conf, name);
conf.setComputationClass(EmptyComputation.class);
conf.setVertexInputFormatClass(
JsonLongDoubleFloatDoubleVertexInputFormat.class);
conf.setVertexOutputFormatClass(
RexsterLongDoubleFloatVertexOutputFormat.class);
conf.setEdgeOutputFormatClass(
RexsterLongDoubleFloatEdgeOutputFormat.class);
/* graph used for testing */
String[] graph = new String[] {
"[1,0,[[2,1],[4,3]]]",
"[2,0,[[1,1],[3,2],[4,1]]]",
"[3,0,[[2,2]]]",
"[4,0,[[1,3],[5,4],[2,1]]]",
"[5,0,[[3,4],[4,4]]]"
};
InternalVertexRunner.run(conf, graph);
URL url = this.getClass().getResource(name + "-output.json");
File file = new File(url.toURI());
ArrayList<Element> expected =
convertIterator(Files.readLines(file,Charsets.UTF_8).iterator());
ArrayList<Element> result = getRexsterContent(name);
checkResult(expected, result);
}
/**
* Test compute method that sends each edge a notification of its parents.
* The test set only has a 1-1 parent-to-child ratio for this unit test.
*/
public static class EmptyComputation
extends BasicComputation<LongWritable, DoubleWritable,
FloatWritable, LongWritable> {
@Override
public void compute(
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
vertex.voteToHalt();
}
}
/**
* Start the Rexster server by preparing the configuration file loaded via
* the resources and setting other important parameters.
*/
@SuppressWarnings("unchecked")
private static void startRexsterServer() throws Exception {
InputStream rexsterConf =
TestRexsterLongDoubleFloatIOFormat.class.getResourceAsStream(
REXSTER_CONF);
XMLConfiguration properties = new XMLConfiguration();
properties.load(rexsterConf);
rexsterConf.close();
List<HierarchicalConfiguration> graphConfigs =
properties.configurationsAt(Tokens.REXSTER_GRAPH_PATH);
RexsterApplication application = new XmlRexsterApplication(graphConfigs);
server = new HttpRexsterServer(properties);
int scriptEngineThreshold =
properties.getInt("script-engine-reset-threshold",
EngineController.RESET_NEVER);
String scriptEngineInitFile =
properties.getString("script-engine-init", "");
/* allow scriptengines to be configured so that folks can drop in
different gremlin flavors. */
List<String> scriptEngineNames = properties.getList("script-engines");
if (scriptEngineNames == null) {
/* configure to default with gremlin-groovy */
EngineController.configure(scriptEngineThreshold, scriptEngineInitFile);
} else {
EngineController.configure(scriptEngineThreshold, scriptEngineInitFile,
new HashSet<String>(scriptEngineNames));
}
server.start(application);
}
private static void stopRexsterServer() throws Exception {
try {
server.stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void deleteDbs() throws Exception {
for (int i = 0; i < DATABASES.length; ++i) {
FileUtils.deleteDirectory(new File(TMP_DIR + DATABASES[i]));
FileUtils.deleteDirectory(new File(TMP_DIR + "empty" + DATABASES[i]));
}
FileUtils.deleteDirectory(new File(TMP_DIR + EMPTYDB));
}
private static void insertDbData() throws Exception {
for (int i = 0; i < DATABASES.length; ++i) {
URL obj = new URL("http://127.0.0.1:18182/graphs/" + DATABASES[i] +
"/tp/giraph/vertices");
HttpURLConnection conn = (HttpURLConnection) obj.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Accept", "*/*");
conn.setRequestProperty("Content-Type",
"application/json; charset=UTF-8");
conn.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(conn.getOutputStream());
/* write the JSON to be sent */
wr.writeBytes("{ \"vlabel\":\"_vid\", \"tx\":[ ");
wr.writeBytes("{ \"value\":0,\"_vid\":0 },");
wr.writeBytes("{ \"value\":0,\"_vid\":1 },");
wr.writeBytes("{ \"value\":0,\"_vid\":2 },");
wr.writeBytes("{ \"value\":0,\"_vid\":3 },");
wr.writeBytes("{ \"value\":0,\"_vid\":4 }");
wr.writeBytes(" ] }");
int responseCode = conn.getResponseCode();
if (responseCode != 200) {
throw new RuntimeException("Unable to insert data in " + DATABASES[i] +
" code: " + responseCode + "\nresponse: " + conn.getResponseMessage());
}
BufferedReader in = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
obj = new URL("http://127.0.0.1:18182/graphs/" + DATABASES[i] +
"/tp/giraph/edges");
conn = (HttpURLConnection) obj.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Accept", "*/*");
conn.setRequestProperty("Content-Type",
"application/json; charset=UTF-8");
conn.setDoOutput(true);
wr = new DataOutputStream(conn.getOutputStream());
/* write the JSON to be sent */
wr.writeBytes("{ \"vlabel\":\"_vid\", \"tx\":[ ");
wr.writeBytes("{ \"value\": 1, \"_outV\": 0, \"_inV\": 1 },");
wr.writeBytes("{ \"value\": 3, \"_outV\": 0, \"_inV\": 3 },");
wr.writeBytes("{ \"value\": 1, \"_outV\": 1, \"_inV\": 0 },");
wr.writeBytes("{ \"value\": 2, \"_outV\": 1, \"_inV\": 2 },");
wr.writeBytes("{ \"value\": 1, \"_outV\": 1, \"_inV\": 3 },");
wr.writeBytes("{ \"value\": 5, \"_outV\": 2, \"_inV\": 1 },");
wr.writeBytes("{ \"value\": 4, \"_outV\": 2, \"_inV\": 4 },");
wr.writeBytes("{ \"value\": 3, \"_outV\": 3, \"_inV\": 0 },");
wr.writeBytes("{ \"value\": 1, \"_outV\": 3, \"_inV\": 1 },");
wr.writeBytes("{ \"value\": 4, \"_outV\": 3, \"_inV\": 4 },");
wr.writeBytes("{ \"value\": 4, \"_outV\": 4, \"_inV\": 3 },");
wr.writeBytes("{ \"value\": 4, \"_outV\": 4, \"_inV\": 2 }");
wr.writeBytes(" ] }");
wr.flush();
wr.close();
responseCode = conn.getResponseCode();
if (responseCode != 200) {
throw new RuntimeException("Unable to insert data in " + DATABASES[i] +
" code: " + responseCode + "\nresponse: " + conn.getResponseMessage());
}
in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
}
}
private ArrayList<Element> convertIterator(Iterator<String> elementit)
throws JSONException {
ArrayList<Element> result = new ArrayList<Element>();
while(elementit.hasNext()) {
JSONArray vertex = new JSONArray(elementit.next());
Element element = new Element(vertex.getLong(0), vertex.getLong(1));
JSONArray edges = vertex.getJSONArray(2);
for (int i = 0; i < edges.length(); ++i) {
element.add(edges.getJSONArray(i).toString());
}
result.add(element);
}
return result;
}
private ArrayList<Element> getRexsterContent(String name) throws Exception {
ArrayList<Element> result = new ArrayList<Element>();
/* get all the vertices */
URL url = new URL("http://127.0.0.1:18182/graphs/" + name +
"/tp/giraph/vertices");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
InputStream is = conn.getInputStream();
StringBuffer json = new StringBuffer();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
while (br.ready()) {
json.append(br.readLine());
}
br.close();
is.close();
JSONObject results = new JSONObject(json.toString());
JSONArray vertices = results.getJSONArray("results");
for (int i = 0; i < vertices.length(); ++i) {
JSONObject vertex = vertices.getJSONObject(i);
long id = getId(vertex, "_id");
result.add(new Element(id, 0));
}
/* get all the edges */
url = new URL("http://127.0.0.1:18182/graphs/" + name + "/tp/giraph/edges");
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
is = conn.getInputStream();
json = new StringBuffer();
br = new BufferedReader(new InputStreamReader(is));
while (br.ready()) {
json.append(br.readLine());
}
br.close();
is.close();
results = new JSONObject(json.toString());
JSONArray edges = results.getJSONArray("results");
for (int i = 0; i < edges.length(); ++i) {
JSONObject edge = edges.getJSONObject(i);
long inV = getId(edge, "_inV");
long outV = getId(edge, "_outV");
long value = edge.getLong("value");
for (int j = 0; j < result.size(); ++j) {
Element element = result.get(j);
if (element.id == outV) {
element.add("[" + inV + "," + value + "]");
}
}
}
return result;
}
private long getId(JSONObject obj, String label) throws Exception {
long id = 0;
try {
id = obj.getLong(label);
} catch(JSONException e) {
String idString = obj.getString(label);
String[] splits = idString.split(":");
id = Integer.parseInt(splits[1]);
}
return id;
}
protected void checkResult(ArrayList<Element> expected,
ArrayList<Element> result) throws Exception {
for (int i = 0; i < expected.size(); ++i) {
boolean found = false;
for (int j = 0; j < result.size(); ++j) {
if (expected.get(i).equals(result.get(j))) {
found = true;
}
}
Assert.assertTrue("expected: " + expected + " result: " + result, found);
}
}
protected static class Element {
public long id;
public long value;
public ArrayList<String> edges;
public Element(long id, long value) {
this.id = id;
this.value = value;
this.edges = new ArrayList<String>();
}
public void add(String edge) {
edges.add(edge);
}
public boolean equals(Element obj) {
if (id != obj.id || value != obj.value) {
return false;
}
for (int i = 0; i < edges.size(); ++i) {
boolean found = false;
for (int j = 0; j < obj.edges.size(); ++j) {
if (edges.get(i).equals(obj.edges.get(j))) {
found = true;
}
}
if (found == false) {
return false;
}
}
return true;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("id: ");
sb.append(id);
sb.append(" value: ");
sb.append(value);
sb.append(" edges: ");
for (String element : edges) {
sb.append(element + " ");
}
return sb.toString();
}
}
}