blob: a91dcae22fa8af36347a3de6f19fd21dcf990e31 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.sql;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;
import org.apache.calcite.sql.SqlNode;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
class StormSqlImpl extends StormSql {
private final StormSqlContext sqlContext;
StormSqlImpl() {
sqlContext = new StormSqlContext();
}
@Override
public void submit(
String name, Iterable<String> statements, Map<String, Object> topoConf, SubmitOptions opts,
StormSubmitter.ProgressListener progressListener, String asUser)
throws Exception {
for (String sql : statements) {
StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();
if (node instanceof SqlCreateTable) {
sqlContext.interpretCreateTable((SqlCreateTable) node);
} else if (node instanceof SqlCreateFunction) {
sqlContext.interpretCreateFunction((SqlCreateFunction) node);
} else {
AbstractStreamsProcessor processor = sqlContext.compileSql(sql);
StormTopology topo = processor.build();
Path jarPath = null;
try {
// QueryPlanner on Streams mode configures the topology with compiled classes,
// so we need to add new classes into topology jar
// Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
jarPath = Files.createTempFile("storm-sql", ".jar");
System.setProperty("storm.jar", jarPath.toString());
packageTopology(jarPath, processor);
StormSubmitter.submitTopologyAs(name, topoConf, topo, opts, progressListener, asUser);
} finally {
if (jarPath != null) {
Files.delete(jarPath);
}
}
}
}
}
@Override
public void explain(Iterable<String> statements) throws Exception {
for (String sql : statements) {
System.out.println("===========================================================");
System.out.println("query>");
System.out.println(sql);
System.out.println("-----------------------------------------------------------");
StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();
if (node instanceof SqlCreateTable) {
sqlContext.interpretCreateTable((SqlCreateTable) node);
System.out.println("No plan presented on DDL");
} else if (node instanceof SqlCreateFunction) {
sqlContext.interpretCreateFunction((SqlCreateFunction) node);
System.out.println("No plan presented on DDL");
} else {
String plan = sqlContext.explain(sql);
System.out.println("plan>");
System.out.println(plan);
}
System.out.println("===========================================================");
}
}
private void packageTopology(Path jar, AbstractStreamsProcessor processor) throws IOException {
Manifest manifest = new Manifest();
Attributes attr = manifest.getMainAttributes();
attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
try (JarOutputStream out = new JarOutputStream(
new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
if (classLoaders != null && !classLoaders.isEmpty()) {
for (CompilingClassLoader classLoader : classLoaders) {
for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
out.write(e.getValue().toByteArray());
out.closeEntry();
}
}
}
}
}
}