blob: a68ba0cc06105c6c3004d6197b72af08ca909f13 [file] [log] [blame]
/*
* *
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* * <p>
* * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.storm.sql.compiler.backends.trident;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.storm.sql.TestUtils;
import org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction;
import org.apache.storm.sql.compiler.TestCompilerUtils;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import storm.trident.TridentTopology;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import static org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction.*;
public class TestPlanCompiler {
private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
@Before
public void setUp() {
getCollectedValues().clear();
}
@Test
public void testCompile() throws Exception {
final int EXPECTED_VALUE_SIZE = 2;
String sql = "SELECT ID FROM FOO WHERE ID > 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
PlanCompiler compiler = new PlanCompiler(typeFactory);
final AbstractTridentProcessor proc = compiler.compile(state.tree());
final Map<String, ISqlTridentDataSource> data = new HashMap<>();
data.put("FOO", new TestUtils.MockSqlTridentDataSource());
final TridentTopology topo = proc.build(data);
Fields f = proc.outputStream().getOutputFields();
proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
}
@Test
public void testInsert() throws Exception {
final int EXPECTED_VALUE_SIZE = 1;
String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
PlanCompiler compiler = new PlanCompiler(typeFactory);
final AbstractTridentProcessor proc = compiler.compile(state.tree());
final Map<String, ISqlTridentDataSource> data = new HashMap<>();
data.put("FOO", new TestUtils.MockSqlTridentDataSource());
data.put("BAR", new TestUtils.MockSqlTridentDataSource());
final TridentTopology topo = proc.build(data);
runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
}
private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
TridentTopology topo) throws Exception {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
ILocalCluster cluster = new LocalCluster();
StormTopology stormTopo = topo.build();
try {
Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
cluster.submitTopology("storm-sql", conf, stormTopo);
waitForCompletion(1000 * 1000, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return getCollectedValues().size() < expectedValueSize;
}
});
} finally {
Utils.resetClassLoaderForJavaDeSerialize();
cluster.shutdown();
}
}
private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
long start = TestUtils.monotonicNow();
while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
Thread.sleep(100);
}
}
}