blob: 46009e92348b6cf7087eaf9245e149b97db74699 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.sql.compiler.backends.standalone;
import com.google.common.base.Joiner;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.storm.sql.compiler.CompilerUtil;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
public class PlanCompiler {
private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
private static final String PROLOGUE = NEW_LINE_JOINER.join(
"// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
"import java.util.Iterator;", "import java.util.Map;",
"import backtype.storm.tuple.Values;",
"import org.apache.storm.sql.runtime.AbstractChannelHandler;",
"import org.apache.storm.sql.runtime.Channels;",
"import org.apache.storm.sql.runtime.ChannelContext;",
"import org.apache.storm.sql.runtime.ChannelHandler;",
"import org.apache.storm.sql.runtime.DataSource;",
"import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
"public final class Processor extends AbstractValuesProcessor {", "");
private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
" @Override",
" public void initialize(Map<String, DataSource> data,",
" ChannelHandler result) {",
" ChannelContext r = Channels.chain(Channels.voidContext(), result);",
""
);
private final JavaTypeFactory typeFactory;
public PlanCompiler(JavaTypeFactory typeFactory) {
this.typeFactory = typeFactory;
}
private String generateJavaSource(RelNode root) throws Exception {
StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
printPrologue(pw);
compiler.traverse(root);
printMain(pw, root);
printEpilogue(pw);
}
return sw.toString();
}
private void printMain(PrintWriter pw, RelNode root) {
Set<TableScan> tables = new HashSet<>();
pw.print(INITIALIZER_PROLOGUE);
chainOperators(pw, root, tables);
for (TableScan n : tables) {
String escaped = CompilerUtil.escapeJavaString(
Joiner.on('.').join(n.getTable().getQualifiedName()), true);
String r = NEW_LINE_JOINER.join(
" if (!data.containsKey(%1$s))",
" throw new RuntimeException(\"Cannot find table \" + %1$s);",
" data.get(%1$s).open(CTX_%2$d);",
"");
pw.print(String.format(r, escaped, n.getId()));
}
pw.print(" }\n");
}
private void chainOperators(
PrintWriter pw, RelNode root, Set<TableScan> tables) {
String lastCtx = "r";
Queue<RelNode> q = new ArrayDeque<>();
q.add(root);
RelNode n;
while ((n = q.poll()) != null) {
pw.print(
String.format(" ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
n.getId(), lastCtx, RelNodeCompiler.getStageName(n)));
lastCtx = String.format("CTX_%d", n.getId());
if (n instanceof TableScan) {
tables.add((TableScan)n);
}
for (RelNode i : n.getInputs()) {
q.add(i);
}
}
}
public AbstractValuesProcessor compile(RelNode plan) throws Exception {
String javaCode = generateJavaSource(plan);
ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
PACKAGE_NAME + ".Processor",
javaCode, null);
return (AbstractValuesProcessor) cl.loadClass(
PACKAGE_NAME + ".Processor").newInstance();
}
private static void printEpilogue(
PrintWriter pw) throws Exception {
pw.print("}\n");
}
private static void printPrologue(PrintWriter pw) {
pw.append(PROLOGUE);
}
}