blob: 4cb910096938850a097f3e9fe8acd5855e516b33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.script;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class ScriptRecordReader extends RecordReader<NullWritable, VertexWritable> {
protected final static String SCRIPT_FILE = "gremlin.hadoop.scriptInputFormat.script";
//protected final static String SCRIPT_ENGINE = "gremlin.hadoop.scriptInputFormat.scriptEngine";
private final static String GRAPH = "graph";
private final static String LINE = "line";
private final static String FACTORY = "factory";
private final static String READ_CALL = "parse(" + LINE + "," + FACTORY + ")";
private final VertexWritable vertexWritable = new VertexWritable();
private final LineRecordReader lineRecordReader;
private ScriptEngine engine;
private String parse;
private CompiledScript script;
public ScriptRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
this.lineRecordReader.initialize(genericSplit, context);
final Configuration configuration = context.getConfiguration();
this.engine = new GremlinGroovyScriptEngine((CompilerCustomizerProvider) new DefaultImportCustomizerProvider());
//this.engine = ScriptEngineCache.get(configuration.get(SCRIPT_ENGINE, ScriptEngineCache.DEFAULT_SCRIPT_ENGINE));
final FileSystem fs = FileSystem.get(configuration);
try (final InputStream stream = fs.open(new Path(configuration.get(SCRIPT_FILE)));
final InputStreamReader reader = new InputStreamReader(stream)) {
this.parse = String.join("\n", IOUtils.toString(reader), READ_CALL);
script = ((Compilable) engine).compile(this.parse);
} catch (ScriptException e) {
throw new IOException(e.getMessage(), e);
}
}
@Override
public boolean nextKeyValue() throws IOException {
while (true) {
if (!this.lineRecordReader.nextKeyValue()) return false;
try {
final Bindings bindings = this.engine.createBindings();
final StarGraph graph = StarGraph.open();
final ScriptElementFactory factory = new ScriptElementFactory(graph);
bindings.put(GRAPH, graph);
bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
bindings.put(FACTORY, factory);
final Vertex vertex = (Vertex) script.eval(bindings);
if (vertex != null) {
this.vertexWritable.set(vertex);
return true;
}
} catch (final ScriptException e) {
throw new IOException(e.getMessage(), e);
}
}
}
@Override
public NullWritable getCurrentKey() {
return NullWritable.get();
}
@Override
public VertexWritable getCurrentValue() {
return this.vertexWritable;
}
@Override
public float getProgress() throws IOException {
return this.lineRecordReader.getProgress();
}
@Override
public synchronized void close() throws IOException {
this.lineRecordReader.close();
}
@Deprecated
protected class ScriptElementFactory {
private final StarGraph graph;
public ScriptElementFactory() {
this(StarGraph.open());
}
public ScriptElementFactory(final StarGraph graph) {
this.graph = graph;
}
public Vertex vertex(final Object id) {
return vertex(id, Vertex.DEFAULT_LABEL);
}
public Vertex vertex(final Object id, final String label) {
final Iterator<Vertex> vertices = graph.vertices(id);
return vertices.hasNext() ? vertices.next() : graph.addVertex(T.id, id, T.label, label);
}
public Edge edge(final Vertex out, final Vertex in) {
return edge(out, in, Edge.DEFAULT_LABEL);
}
public Edge edge(final Vertex out, final Vertex in, final String label) {
return out.addEdge(label, in);
}
}
}