blob: e29d945b6bf9b6479893344219e4a72911754f1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.presto;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
public class TestBasicPresto extends PulsarTestSuite {
private static final int NUM_OF_STOCKS = 10;
@BeforeClass
public void setupPresto() throws Exception {
log.info("[setupPresto]");
pulsarCluster.startPrestoWorker();
}
@AfterClass
public void teardownPresto() {
log.info("tearing down...");
pulsarCluster.stopPrestoWorker();
}
@Test
public void testSimpleSQLQueryBatched() throws Exception {
testSimpleSQLQuery(true);
}
@Test
public void testSimpleSQLQueryNonBatched() throws Exception {
testSimpleSQLQuery(false);
}
public void testSimpleSQLQuery(boolean isBatched) throws Exception {
// wait until presto worker started
ContainerExecResult result;
do {
try {
result = execQuery("show catalogs;");
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("pulsar", "system");
break;
} catch (ContainerExecException cee) {
if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
Thread.sleep(10000);
} else {
throw cee;
}
}
} while (true);
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
String stocksTopic;
if (isBatched) {
stocksTopic = "stocks_batched";
} else {
stocksTopic = "stocks_nonbatched";
}
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
.topic(stocksTopic)
.enableBatching(isBatched)
.create();
for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
producer.send(stock);
}
producer.flush();
result = execQuery("show schemas in pulsar;");
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("public/default");
result = execQuery("show tables in pulsar.\"public/default\";");
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("stocks");
ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
String[] split2 = containerExecResult.getStdout().split("\n|,");
for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
assertThat(split2).contains("\"" + i + "\"");
assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
}
// test predicate pushdown
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
Connection connection = DriverManager.getConnection(url, "test", null);
String query = String.format("select * from pulsar" +
".\"public/default\".%s order by __publish_time__", stocksTopic);
log.info("Executing query: {}", query);
ResultSet res = connection.createStatement().executeQuery(query);
List<Timestamp> timestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
timestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
List<Timestamp> returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
// Try with a predicate that has a earlier time than any entry
// Should return all rows
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
// Try with a predicate that has a latter time than any entry
// Should return no rows
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(0);
}
@AfterSuite
@Override
public void tearDownCluster() {
super.tearDownCluster();
}
public static ContainerExecResult execQuery(final String query) throws Exception {
ContainerExecResult containerExecResult;
containerExecResult = pulsarCluster.getPrestoWorkerContainer()
.execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
return containerExecResult;
}
private static void printCurrent(ResultSet rs) throws SQLException {
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
for (int i = 1; i <= columnsNumber; i++) {
if (i > 1) System.out.print(", ");
String columnValue = rs.getString(i);
System.out.print(columnValue + " " + rsmd.getColumnName(i));
}
System.out.println("");
}
}