blob: f7b702a9ccf70da981e208ace71528bdca81feb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.groovy.loaders.GremlinLoader;
import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
import org.apache.tinkerpop.gremlin.groovy.util.SugarTestHelper;
import org.apache.tinkerpop.gremlin.groovy.util.TestableConsolePluginAcceptor;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGremlinSuite;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Before;
import org.junit.Test;
import javax.tools.JavaCompiler;
import javax.tools.SimpleJavaFileObject;
import javax.tools.ToolProvider;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* This is an test that is mean to be used in the context of the {@link HadoopGremlinSuite} and shouldn't be
* executed on its own.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
@Before
public void setupTest() {
try {
this.console = new TestableConsolePluginAcceptor();
final HadoopGremlinPlugin plugin = new HadoopGremlinPlugin();
plugin.pluginTo(this.console);
this.remote = (HadoopRemoteAcceptor) plugin.remoteAcceptor().get();
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
///////////////////
private HadoopRemoteAcceptor remote;
private TestableConsolePluginAcceptor console;
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportRemoteTraversal() throws Exception {
this.console.addBinding("graph", this.graph);
this.console.addBinding("g", this.g);
this.remote.connect(Arrays.asList("graph", "g"));
//
Traversal<?, ?> traversal = (Traversal<?, ?>) this.remote.submit(Arrays.asList("g.V().count()"));
assertEquals(6L, traversal.next());
assertFalse(traversal.hasNext());
assertNotNull(this.console.getBindings().get(RemoteAcceptor.RESULT));
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportRemoteSugarTraversal() throws Exception {
SugarTestHelper.clearRegistry(this.graphProvider);
this.console.addBinding("graph", this.graph);
this.console.addBinding("g", this.g);
//
this.remote.connect(Arrays.asList("graph", "g"));
try {
this.remote.submit(Arrays.asList("g.V.name.map{it.length()}.sum"));
fail("Should not allow sugar usage");
} catch (final Exception e) {
// this is good
}
//
this.remote.configure(Arrays.asList("useSugar", "true"));
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<?, ?> traversal = (Traversal<?, ?>) this.remote.submit(Arrays.asList("g.V.name.map{it.length()}.sum"));
assertEquals(28l, traversal.next());
assertFalse(traversal.hasNext());
assertNotNull(this.console.getBindings().get(RemoteAcceptor.RESULT));
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportRemoteGroupTraversal() throws Exception {
SugarTestHelper.clearRegistry(this.graphProvider);
GremlinLoader.load();
this.console.addBinding("graph", this.graph);
this.console.addBinding("g", this.g);
this.remote.connect(Arrays.asList("graph"));
//
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<?, Map<String, List<String>>> traversal = (Traversal<?, Map<String, List<String>>>) this.remote.submit(Arrays.asList("g.V().out().group().by{it.value('name')[1]}.by('name')"));
Map<String, List<String>> map = traversal.next();
assertEquals(3, map.size());
assertEquals(1, map.get("a").size());
assertEquals("vadas", map.get("a").get(0));
assertEquals(1, map.get("i").size());
assertEquals("ripple", map.get("i").get(0));
assertEquals(4, map.get("o").size());
assertTrue(map.get("o").contains("josh"));
assertTrue(map.get("o").contains("lop"));
assertNotNull(this.console.getBindings().get(RemoteAcceptor.RESULT));
//
traversal = (Traversal<?, Map<String, List<String>>>) this.remote.submit(Arrays.asList("g.V().out().group().by(label).by{it.value('name')[1]}"));
map = traversal.next();
assertEquals(2, map.size());
assertEquals(4, map.get("software").size());
assertTrue(map.get("software").contains("o"));
assertTrue(map.get("software").contains("i"));
assertEquals(2, map.get("person").size());
assertTrue(map.get("person").contains("o"));
assertTrue(map.get("person").contains("a"));
assertNotNull(this.console.getBindings().get(RemoteAcceptor.RESULT));
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportHDFSMethods() throws Exception {
List<String> ls = (List<String>) this.console.eval("hdfs.ls()");
for (final String line : ls) {
assertTrue(line.startsWith("-") || line.startsWith("r") || line.startsWith("w") || line.startsWith("x"));
assertEquals(" ", line.substring(9, 10));
}
ls = (List<String>) this.console.eval("fs.ls()");
for (final String line : ls) {
assertTrue(line.startsWith("-") || line.startsWith("r") || line.startsWith("w") || line.startsWith("x"));
assertEquals(" ", line.substring(9, 10));
}
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldGracefullyHandleBadGremlinHadoopLibs() throws Exception {
System.setProperty(Constants.HADOOP_GREMLIN_LIBS, TestHelper.makeTestDataDirectory(HadoopGremlinPluginCheck.class, "shouldGracefullyHandleBadGremlinHadoopLibs"));
this.graph.configuration().setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true);
this.console.addBinding("graph", this.graph);
this.console.addBinding("g", this.g);
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<?, ?> traversal = (Traversal<?, ?>) this.remote.submit(Arrays.asList("g.V()"));
assertEquals(6, IteratorUtils.count(traversal));
assertNotNull(this.console.getBindings().get(RemoteAcceptor.RESULT));
}
@Test
@LoadGraphWith(LoadGraphWith.GraphData.MODERN)
public void shouldSupportVariousFileSystemsInGremlinHadoopLibs() throws Exception {
// The whole point of this test is to verify that HADOOP_GREMLIN_LIBS may contain paths with or without
// a file system scheme prefix and that either path is properly handled. If all jar files, that were specified
// in HADOOP_GREMLIN_LIBS, are found in the GraphComputers temporary directory after using the GraphComputer
// (by submitting a traversal), the test is considered to be successful.
//
// The traversal will likely never fail, since both - Spark and Giraph - run in the same JVM during tests. This
// is unfortunate as it doesn't allow us to verify that GraphComputers load jars properly in a distributed
// environment. The test would fail in a distributed environment, IF loading the jars specified in
// HADOOP_GREMLIN_LIBS wouldn't work. That is because we generate new jar files on the fly that are definitely
// not part of any existing directory or the current classpath.
final String testDataDirectory = TestHelper.makeTestDataDirectory(HadoopGremlinPluginCheck.class, "shouldHandleLocalGremlinHadoopLibs");
final File jarFile1 = createJarFile(testDataDirectory + File.separator + "1", "Greeter1");
final File jarFile2 = createJarFile(testDataDirectory + File.separator + "2", "Greeter2");
final String graphComputerJarTargetBasePath = System.getProperty("java.io.tmpdir") + File.separator +
"hadoop-gremlin-" + Gremlin.version() + "-libs" + File.separator;
final File graphComputerJarTargetPath1 = new File(graphComputerJarTargetBasePath + "1" + File.separator + "Greeter1.jar");
final File graphComputerJarTargetPath2 = new File(graphComputerJarTargetBasePath + "2" + File.separator + "Greeter2.jar");
for (final boolean withScheme : Arrays.asList(false, true)) {
Stream<String> hadoopGremlinLibs = Arrays.asList(jarFile1, jarFile2).stream().map(f -> f.getParentFile().getAbsolutePath());
if (withScheme) {
hadoopGremlinLibs = hadoopGremlinLibs.map(path -> "file://" + path);
}
System.setProperty(Constants.HADOOP_GREMLIN_LIBS, String.join(File.pathSeparator, hadoopGremlinLibs.collect(Collectors.toList())));
this.graph.configuration().setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true);
this.console.addBinding("graph", this.graph);
this.console.addBinding("g", this.g);
this.remote.connect(Arrays.asList("graph", "g"));
Traversal<?, ?> traversal = (Traversal<?, ?>) this.remote.submit(Arrays.asList(
"ClassLoader.getSystemClassLoader().addURL('" + jarFile1.toURI().toURL() + "'.toURL());",
"ClassLoader.getSystemClassLoader().addURL('" + jarFile2.toURI().toURL() + "'.toURL());",
"g.V().choose(hasLabel('person'), " +
"values('name').map {Class.forName('Greeter1').hello(it.get())}, " +
"values('name').map {Class.forName('Greeter2').hello(it.get())})"));
final List<String> expectedMessages = Arrays.asList("marko", "josh", "peter", "vadas").stream().
map(name -> "Greeter1 says: Hello " + name + "!").collect(Collectors.toList());
expectedMessages.addAll(Arrays.asList("lop", "ripple").stream().
map(name -> "Greeter2 says: Hello " + name + "!").collect(Collectors.toList()));
while (traversal.hasNext()) {
final String message = (String) traversal.next();
assertTrue(expectedMessages.remove(message));
}
assertEquals(0, expectedMessages.size());
}
assertTrue(graphComputerJarTargetPath1.exists());
assertTrue(graphComputerJarTargetPath2.exists());
assert graphComputerJarTargetPath1.delete();
assert graphComputerJarTargetPath2.delete();
}
private File createJarFile(final String directory, final String className) throws IOException {
new File(directory).mkdirs();
final File classFile = new File(directory + File.separator + className + ".class");
final File jarFile = new File(directory + File.separator + className + ".jar");
jarFile.deleteOnExit();
final JavaStringObject source = new JavaStringObject(className,
"public class " + className + " {\n" +
" public static String hello(final String name) {\n" +
" return \"" + className + " says: Hello \" + name + \"!\";\n" +
" }\n" +
"}");
final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
final List<String> options = Arrays.asList(
"-d", classFile.getParentFile().getAbsolutePath()
);
assert compiler.getTask(null, null, null, options, null, Collections.singletonList(source)).call();
final Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
try (final JarOutputStream target = new JarOutputStream(new FileOutputStream(jarFile), manifest)) {
final JarEntry entry = new JarEntry(classFile.getName());
entry.setTime(classFile.lastModified());
target.putNextEntry(entry);
try (final FileInputStream fis = new FileInputStream(classFile);
final BufferedInputStream in = new BufferedInputStream(fis)) {
final byte buffer[] = new byte[1024];
while (true) {
final int count = in.read(buffer);
if (count < 0) break;
target.write(buffer, 0, count);
}
}
target.closeEntry();
}
assert classFile.delete();
return jarFile;
}
private static class JavaStringObject extends SimpleJavaFileObject {
private final String code;
JavaStringObject(final String className, final String code) {
super(URI.create("string:///" + className.replace(".", "/") + Kind.SOURCE.extension), Kind.SOURCE);
this.code = code;
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
return code;
}
}
}