blob: 673750d5dd26bee7d238498a041e7b0a52847d27 [file] [log] [blame]
package org.apache.rya.accumulo.pig;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.Projection;
import org.eclipse.rdf4j.query.algebra.ProjectionElemList;
import org.eclipse.rdf4j.query.algebra.Slice;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.Union;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
/**
* Created by IntelliJ IDEA.
* Date: 4/12/12
* Time: 10:17 AM
* To change this template use File | Settings | File Templates.
*/
public class SparqlToPigTransformVisitor extends AbstractQueryModelVisitor<RuntimeException> {
private StringBuilder pigScriptBuilder = new StringBuilder();
private String tablePrefix;
private String instance, zk, user, password; //TODO: use a Configuration object to get these
private Map<String, String> varToSet = new HashMap<String, String>();
private Map<TupleExpr, List<String>> exprToNames = new HashMap<TupleExpr, List<String>>();
private Map<TupleExpr, String> exprToVar = new HashMap<TupleExpr, String>();
private char i = 'A'; //TODO: do better, hack
public SparqlToPigTransformVisitor() {
pigScriptBuilder.append("set pig.splitCombination false;\n")
.append("set default_parallel 32;\n") //TODO: set parallel properly
.append("set mapred.map.tasks.speculative.execution false;\n")
.append("set mapred.reduce.tasks.speculative.execution false;\n")
.append("set io.sort.mb 256;\n")
.append("set mapred.child.java.opts -Xmx2048m;\n")
.append("set mapred.compress.map.output true;\n")
.append("set mapred.map.output.compression.codec org.apache.hadoop.io.compress.GzipCodec;\n")
.append("set io.file.buffer.size 65536;\n")
.append("set io.sort.factor 25;\n");
}
@Override
public void meet(StatementPattern node) throws RuntimeException {
super.meet(node);
String subjValue = getVarValue(node.getSubjectVar());
String predValue = getVarValue(node.getPredicateVar());
String objValue = getVarValue(node.getObjectVar());
String subj = i + "_s";
String pred = i + "_p";
String obj = i + "_o";
String var = i + "";
if (node.getSubjectVar().getValue() == null) { //TODO: look nicer
subj = node.getSubjectVar().getName();
varToSet.put(subj, var);
addToExprToNames(node, subj);
}
if (node.getPredicateVar().getValue() == null) { //TODO: look nicer
pred = node.getPredicateVar().getName();
varToSet.put(pred, var);
addToExprToNames(node, pred);
}
if (node.getObjectVar().getValue() == null) { //TODO: look nicer
obj = node.getObjectVar().getName();
varToSet.put(obj, var);
addToExprToNames(node, obj);
}
if (node.getContextVar() != null && node.getContextVar().getValue() == null) {
String cntxtName = node.getContextVar().getName();
varToSet.put(cntxtName, var);
addToExprToNames(node, cntxtName);
}
//load 'l_' using org.apache.rya.cloudbase.pig.dep.StatementPatternStorage('<http://www.Department0.University0.edu>', '', '',
// 'stratus', 'stratus13:2181', 'root', 'password') AS (dept:chararray, p:chararray, univ:chararray);
// pigScriptBuilder.append(i).append(" = load '").append(tablePrefix).append("' using org.apache.rya.cloudbase.pig.dep.StatementPatternStorage('")
// .append(subjValue).append("','").append(predValue).append("','").append(objValue).append("','").append(instance).append("','")
// .append(zk).append("','").append(user).append("','").append(password).append("') AS (").append(subj).append(":chararray, ")
// .append(pred).append(":chararray, ").append(obj).append(":chararray);\n");
//load 'cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c'
//using org.apache.rya.accumulo.pig.StatementPatternStorage() AS (dept:chararray, p:chararray, univ:chararray);
pigScriptBuilder.append(i).append(" = load 'accumulo://").append(tablePrefix).append("?instance=").append(instance).append("&user=").append(user)
.append("&password=").append(password).append("&zookeepers=").append(zk);
if (subjValue != null && subjValue.length() > 0) {
pigScriptBuilder.append("&subject=").append(subjValue);
}
if (predValue != null && predValue.length() > 0) {
pigScriptBuilder.append("&predicate=").append(predValue);
}
if (objValue != null && objValue.length() > 0) {
pigScriptBuilder.append("&object=").append(objValue);
}
if (node.getContextVar() != null && node.getContextVar().getValue() != null) {
pigScriptBuilder.append("&context=").append(getVarValue(node.getContextVar()));
}
pigScriptBuilder.append("' using ").append(StatementPatternStorage.class.getName()).append("() AS (").append(subj).append(":chararray, ")
.append(pred).append(":chararray, ").append(obj).append(":chararray");
if (node.getContextVar() != null) {
Value cntxtValue = node.getContextVar().getValue();
String cntxtName = null;
if (cntxtValue == null) {
//use name
cntxtName = node.getContextVar().getName();
} else {
cntxtName = i + "_c";
}
pigScriptBuilder.append(", ").append(cntxtName).append(":chararray");
}
pigScriptBuilder.append(");\n");
//TODO: add auths
exprToVar.put(node, var);
i++;
}
private void addToExprToNames(TupleExpr node, String name) {
List<String> names = exprToNames.get(node);
if (names == null) {
names = new ArrayList<String>();
exprToNames.put(node, names);
}
names.add(name);
}
@Override
public void meet(Union node) throws RuntimeException {
super.meet(node);
TupleExpr leftArg = node.getLeftArg();
TupleExpr rightArg = node.getRightArg();
String left_var = exprToVar.get(leftArg);
String right_var = exprToVar.get(rightArg);
//Q = UNION ONSCHEMA B, P;
pigScriptBuilder.append(i).append(" = UNION ONSCHEMA ").append(left_var).append(", ").append(right_var).append(";\n");
String unionVar = i + "";
List<String> left_names = exprToNames.get(leftArg);
List<String> right_names = exprToNames.get(rightArg);
for (String name : left_names) {
varToSet.put(name, unionVar);
addToExprToNames(node, name);
}
for (String name : right_names) {
varToSet.put(name, unionVar);
addToExprToNames(node, name);
}
exprToVar.put(node, unionVar);
i++;
}
@Override
public void meet(Join node) throws RuntimeException {
super.meet(node);
TupleExpr leftArg = node.getLeftArg();
TupleExpr rightArg = node.getRightArg();
List<String> left_names = exprToNames.get(leftArg);
List<String> right_names = exprToNames.get(rightArg);
Set<String> joinNames = new HashSet<String>(left_names);
joinNames.retainAll(right_names); //intersection, this is what I join on
//SEC = join FIR by (MEMB_OF::ugrad, SUBORG_J::univ), UGRADDEG by (ugrad, univ);
StringBuilder joinStr = new StringBuilder();
joinStr.append("(");
boolean first = true;
for (String name : joinNames) { //TODO: Make this a utility method
if (!first) {
joinStr.append(",");
}
first = false;
joinStr.append(name);
}
joinStr.append(")");
String left_var = exprToVar.get(leftArg);
String right_var = exprToVar.get(rightArg);
if (joinStr.length() <= 2) {
//no join params, need to cross
pigScriptBuilder.append(i).append(" = cross ").append(left_var).append(", ").append(right_var).append(";\n");
} else {
//join
pigScriptBuilder.append(i).append(" = join ").append(left_var);
pigScriptBuilder.append(" by ").append(joinStr);
pigScriptBuilder.append(", ").append(right_var);
pigScriptBuilder.append(" by ").append(joinStr);
pigScriptBuilder.append(";\n");
}
String joinVarStr = i + "";
i++;
// D = foreach C GENERATE A::subj AS subj:chararray, A::A_p AS p:chararray;
String forEachVarStr = i + "";
pigScriptBuilder.append(i).append(" = foreach ").append(joinVarStr).append(" GENERATE ");
Map<String, String> nameToJoinName = new HashMap<String, String>();
for (String name : left_names) {
varToSet.put(name, forEachVarStr);
addToExprToNames(node, name);
nameToJoinName.put(name, left_var + "::" + name);
}
for (String name : right_names) {
varToSet.put(name, forEachVarStr);
addToExprToNames(node, name);
nameToJoinName.put(name, right_var + "::" + name);
}
first = true;
for (Map.Entry entry : nameToJoinName.entrySet()) {
if (!first) {
pigScriptBuilder.append(",");
}
first = false;
pigScriptBuilder.append(entry.getValue()).append(" AS ").append(entry.getKey()).append(":chararray ");
}
pigScriptBuilder.append(";\n");
exprToVar.put(node, forEachVarStr);
i++;
}
@Override
public void meet(Projection node) throws RuntimeException {
super.meet(node);
ProjectionElemList list = node.getProjectionElemList();
String set = null;
StringBuilder projList = new StringBuilder();
boolean first = true;
//TODO: we do not support projections from multiple pig statements yet
for (String name : list.getTargetNames()) {
set = varToSet.get(name); //TODO: overwrite
if (set == null) {
throw new IllegalArgumentException("Have not found any pig logic for name[" + name + "]");
}
if (!first) {
projList.append(",");
}
first = false;
projList.append(name);
}
if (set == null)
throw new IllegalArgumentException(""); //TODO: Fill this
//SUBORG = FOREACH SUBORG_L GENERATE dept, univ;
pigScriptBuilder.append("PROJ = FOREACH ").append(set).append(" GENERATE ").append(projList.toString()).append(";\n");
}
@Override
public void meet(Slice node) throws RuntimeException {
super.meet(node);
long limit = node.getLimit();
//PROJ = LIMIT PROJ 10;
pigScriptBuilder.append("PROJ = LIMIT PROJ ").append(limit).append(";\n");
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getZk() {
return zk;
}
public void setZk(String zk) {
this.zk = zk;
}
public String getInstance() {
return instance;
}
public void setInstance(String instance) {
this.instance = instance;
}
public String getTablePrefix() {
return tablePrefix;
}
public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}
public String getPigScript() {
return pigScriptBuilder.toString();
}
protected String getVarValue(Var var) {
if (var == null) {
return "";
} else {
Value value = var.getValue();
if (value == null) {
return "";
}
if (value instanceof IRI) {
return "<" + value.stringValue() + ">";
}
if (value instanceof Literal) {
Literal lit = (Literal) value;
if (lit.getDatatype() == null) {
//string
return "\\'" + value.stringValue() + "\\'";
}
}
return value.stringValue();
}
}
}