blob: 4285c87bfd1855f5e7613c7f7abbb959a786e69c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.it.udf;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.itbase.constant.UDFTestConstant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBUDFWindowQueryIT {
protected static final int ITERATION_TIMES = 10000;
protected static boolean enableSeqSpaceCompaction;
protected static boolean enableUnseqSpaceCompaction;
protected static boolean enableCrossSpaceCompaction;
@BeforeClass
public static void setUp() throws Exception {
enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
ConfigFactory.getConfig()
.setUdfCollectorMemoryBudgetInMB(5)
.setUdfTransformerMemoryBudgetInMB(5)
.setUdfReaderMemoryBudgetInMB(5);
EnvFactory.getEnv().initBeforeClass();
createTimeSeries();
generateData();
registerUDF();
}
@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanAfterClass();
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
ConfigFactory.getConfig()
.setUdfCollectorMemoryBudgetInMB(100)
.setUdfTransformerMemoryBudgetInMB(100)
.setUdfReaderMemoryBudgetInMB(100);
}
private static void createTimeSeries() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("SET STORAGE GROUP TO root.vehicle");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}
private static void generateData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
for (int i = 0; i < ITERATION_TIMES; ++i) {
statement.execute(
(String.format(
"insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}
private static void registerUDF() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"create function counter as 'org.apache.iotdb.db.query.udf.example.Counter'");
statement.execute(
"create function accumulator as 'org.apache.iotdb.db.query.udf.example.Accumulator'");
statement.execute(
"create function time_window_tester as 'org.apache.iotdb.db.query.udf.example.SlidingTimeWindowConstructionTester'");
statement.execute(
"create function size_window_0 as 'org.apache.iotdb.db.query.udf.example.SlidingSizeWindowConstructorTester0'");
statement.execute(
"create function size_window_1 as 'org.apache.iotdb.db.query.udf.example.SlidingSizeWindowConstructorTester1'");
statement.execute(
"create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}
@Test
public void testRowByRow() {
String sql =
String.format(
"select counter(s1, '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY, UDFTestConstant.ACCESS_STRATEGY_ROW_BY_ROW);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
int count = 0;
assertEquals(2, resultSet.getMetaData().getColumnCount());
while (resultSet.next()) {
assertEquals(count++, (int) (Double.parseDouble(resultSet.getString(1))));
}
assertEquals(ITERATION_TIMES, count);
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}
@Test
public void testSlidingSizeWindow1() {
testSlidingSizeWindow((int) (0.1 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindow2() {
testSlidingSizeWindow((int) (0.033 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindow3() {
testSlidingSizeWindow((int) (0.333 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindow4() {
testSlidingSizeWindow((int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindow5() {
testSlidingSizeWindow(ITERATION_TIMES);
}
@Test
public void testSlidingSizeWindow6() {
testSlidingSizeWindow(3 * ITERATION_TIMES);
}
// todo: remove ignore when exception handler in IT finishes
@Test
@Ignore
public void testSlidingSizeWindow7() {
testSlidingSizeWindow(0);
}
// todo: remove ignore when fixed
@Test
@Ignore
public void testSlidingSizeWindow8() {
testSlidingSizeWindow(-ITERATION_TIMES);
}
private void testSlidingSizeWindow(int windowSize) {
String sql =
String.format(
"select accumulator(s1, '%s'='%s', '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY,
UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE,
UDFTestConstant.WINDOW_SIZE_KEY,
windowSize);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int expectedWindowSize =
(count < ITERATION_TIMES / windowSize)
? windowSize
: ITERATION_TIMES - (ITERATION_TIMES / windowSize) * windowSize;
int expectedAccumulation = 0;
for (int i = count * windowSize; i < count * windowSize + expectedWindowSize; ++i) {
expectedAccumulation += i;
}
assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2))));
++count;
}
} catch (SQLException throwable) {
if (0 < windowSize || !throwable.getMessage().contains(String.valueOf(windowSize))) {
fail(throwable.getMessage());
}
}
sql =
String.format(
"select window_start_end(s1, '%s'='%s', '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY,
UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE,
UDFTestConstant.WINDOW_SIZE_KEY,
windowSize);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int expectedWindowSize =
(count < ITERATION_TIMES / windowSize)
? windowSize
: ITERATION_TIMES - (ITERATION_TIMES / windowSize) * windowSize;
assertEquals(count * windowSize, (int) (Long.parseLong(resultSet.getString(1))));
assertEquals(
expectedWindowSize - 1,
Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1)));
++count;
}
} catch (SQLException throwable) {
if (0 < windowSize || !throwable.getMessage().contains(String.valueOf(windowSize))) {
fail(throwable.getMessage());
}
}
}
@Test
public void testSlidingTimeWindow1() {
testSlidingTimeWindow(
(int) (0.33 * ITERATION_TIMES),
(int) (0.33 * ITERATION_TIMES),
(int) (0.33 * ITERATION_TIMES),
(int) (0.33 * ITERATION_TIMES));
}
@Test
public void testSlidingTimeWindow2() {
testSlidingTimeWindow(
(int) (0.033 * ITERATION_TIMES),
(int) (2 * 0.033 * ITERATION_TIMES),
ITERATION_TIMES / 2,
ITERATION_TIMES);
}
@Test
public void testSlidingTimeWindow3() {
testSlidingTimeWindow(
(int) (2 * 0.033 * ITERATION_TIMES),
(int) (0.033 * ITERATION_TIMES),
ITERATION_TIMES / 2,
ITERATION_TIMES);
}
@Test
public void testSlidingTimeWindow4() {
testSlidingTimeWindow(
(int) (0.033 * ITERATION_TIMES),
(int) (0.033 * ITERATION_TIMES),
ITERATION_TIMES / 2,
ITERATION_TIMES);
}
@Test
public void testSlidingTimeWindow5() {
testSlidingTimeWindow(ITERATION_TIMES, ITERATION_TIMES, 0, ITERATION_TIMES);
}
@Test
public void testSlidingTimeWindow6() {
testSlidingTimeWindow(
(int) (1.01 * ITERATION_TIMES), (int) (0.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
@Test
public void testSlidingTimeWindow7() {
testSlidingTimeWindow(
(int) (0.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
@Test
public void testSlidingTimeWindow8() {
testSlidingTimeWindow(
(int) (1.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
@Test
@Ignore
public void testSlidingTimeWindow9() {
testSlidingTimeWindow(
(int) (0.01 * ITERATION_TIMES), (int) (0.05 * ITERATION_TIMES), ITERATION_TIMES / 2, 0);
}
@Test
@Ignore
public void testSlidingTimeWindow10() {
testSlidingTimeWindow(
(int) (-0.01 * ITERATION_TIMES), (int) (0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
@Test
@Ignore
public void testSlidingTimeWindow11() {
testSlidingTimeWindow(
(int) (0.01 * ITERATION_TIMES), (int) (-0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
@Test
@Ignore
public void testSlidingTimeWindow12() {
testSlidingTimeWindow((int) (0.01 * ITERATION_TIMES), 0, 0, ITERATION_TIMES / 2);
}
@Test
@Ignore
public void testSlidingTimeWindow13() {
testSlidingTimeWindow(0, (int) (0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2);
}
private void testSlidingTimeWindow(
int timeInterval, int slidingStep, int displayWindowBegin, int displayWindowEnd) {
String sql =
String.format(
"select accumulator(s1, s1, s1, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY,
UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME,
UDFTestConstant.TIME_INTERVAL_KEY,
timeInterval,
UDFTestConstant.SLIDING_STEP_KEY,
slidingStep,
UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
displayWindowBegin,
UDFTestConstant.DISPLAY_WINDOW_END_KEY,
displayWindowEnd);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int begin = displayWindowBegin + count * slidingStep;
int expectedWindowSize =
begin + timeInterval < displayWindowEnd ? timeInterval : displayWindowEnd - begin;
int expectedAccumulation = 0;
for (int i = displayWindowBegin + count * slidingStep;
i < displayWindowBegin + count * slidingStep + expectedWindowSize;
++i) {
expectedAccumulation += i;
}
assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2))));
++count;
}
} catch (SQLException throwable) {
if (slidingStep > 0 && timeInterval > 0 && displayWindowEnd >= displayWindowBegin) {
fail(throwable.getMessage());
}
}
sql =
String.format(
"select window_start_end(s1, s1, s1, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY,
UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME,
UDFTestConstant.TIME_INTERVAL_KEY,
timeInterval,
UDFTestConstant.SLIDING_STEP_KEY,
slidingStep,
UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
displayWindowBegin,
UDFTestConstant.DISPLAY_WINDOW_END_KEY,
displayWindowEnd);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int begin = displayWindowBegin + count * slidingStep;
assertEquals(begin, (int) (Long.parseLong(resultSet.getString(1))));
assertEquals(
timeInterval - 1,
Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1)));
++count;
}
} catch (SQLException throwable) {
if (slidingStep > 0 && timeInterval > 0 && displayWindowEnd >= displayWindowBegin) {
fail(throwable.getMessage());
}
}
}
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly1() {
testSlidingTimeWindowWithTimeIntervalOnly(1);
}
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly2() {
testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES / 10);
}
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly3() {
testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES / 33);
}
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly4() {
testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES);
}
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly5() {
testSlidingTimeWindowWithTimeIntervalOnly(2 * ITERATION_TIMES);
}
@Test
@Ignore
public void testSlidingTimeWindowWithTimeIntervalOnly6() {
testSlidingTimeWindowWithTimeIntervalOnly(-ITERATION_TIMES);
}
private void testSlidingTimeWindowWithTimeIntervalOnly(int timeInterval) {
String sql =
String.format(
"select time_window_tester(s1, '%s'='%s') from root.vehicle.d1",
UDFTestConstant.TIME_INTERVAL_KEY, timeInterval);
int displayWindowBegin = 0;
int displayWindowEnd = ITERATION_TIMES;
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int begin = displayWindowBegin + count * timeInterval;
int expectedWindowSize =
begin + timeInterval < displayWindowEnd ? timeInterval : displayWindowEnd - begin;
int expectedAccumulation = 0;
for (int i = displayWindowBegin + count * timeInterval;
i < displayWindowBegin + count * timeInterval + expectedWindowSize;
++i) {
expectedAccumulation += i;
}
assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2))));
++count;
}
} catch (SQLException throwable) {
if (timeInterval > 0) {
fail(throwable.getMessage());
}
}
sql =
String.format(
"select window_start_end(s1, '%s'='%s') from root.vehicle.d1",
UDFTestConstant.TIME_INTERVAL_KEY, timeInterval);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(2, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
int begin = displayWindowBegin + count * timeInterval;
assertEquals(begin, (int) (Long.parseLong(resultSet.getString(1))));
assertEquals(
timeInterval - 1,
Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1)));
++count;
}
} catch (SQLException throwable) {
if (timeInterval > 0) {
fail(throwable.getMessage());
}
}
}
@Test
public void testSlidingSizeWindowWithSlidingStep1() {
testSlidingSizeWindowWithSlidingStep(1, 1, 0);
testSlidingSizeWindowWithSlidingStep(1, 1, 1);
testSlidingSizeWindowWithSlidingStep(1, 1, 10);
testSlidingSizeWindowWithSlidingStep(1, 1, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(1, 1, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep2() {
testSlidingSizeWindowWithSlidingStep(100, 100, 0);
testSlidingSizeWindowWithSlidingStep(100, 100, 100);
testSlidingSizeWindowWithSlidingStep(100, 100, 10000);
testSlidingSizeWindowWithSlidingStep(100, 100, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(100, 100, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep3() {
testSlidingSizeWindowWithSlidingStep(111, 123, 0);
testSlidingSizeWindowWithSlidingStep(111, 123, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(111, 123, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep4() {
testSlidingSizeWindowWithSlidingStep(123, 111, 0);
testSlidingSizeWindowWithSlidingStep(123, 111, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(123, 111, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep5() {
testSlidingSizeWindowWithSlidingStep(100, 10000, 0);
testSlidingSizeWindowWithSlidingStep(100, 10000, 100);
testSlidingSizeWindowWithSlidingStep(100, 10000, 10000);
testSlidingSizeWindowWithSlidingStep(100, 10000, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(100, 10000, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep6() {
testSlidingSizeWindowWithSlidingStep(10000, 1000, 0);
testSlidingSizeWindowWithSlidingStep(10000, 1000, 1000);
testSlidingSizeWindowWithSlidingStep(10000, 1000, 10000);
testSlidingSizeWindowWithSlidingStep(10000, 1000, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(10000, 1000, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep7() {
testSlidingSizeWindowWithSlidingStep((int) (1.5 * ITERATION_TIMES), 4333, 0);
testSlidingSizeWindowWithSlidingStep(
(int) (1.5 * ITERATION_TIMES), 4333, (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(
(int) (1.5 * ITERATION_TIMES), 4333, (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep8() {
testSlidingSizeWindowWithSlidingStep(10000, (int) (1.5 * ITERATION_TIMES), 0);
testSlidingSizeWindowWithSlidingStep(
10000, (int) (1.5 * ITERATION_TIMES), (int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(
10000, (int) (1.5 * ITERATION_TIMES), (int) (1.5 * ITERATION_TIMES));
}
@Test
public void testSlidingSizeWindowWithSlidingStep9() {
testSlidingSizeWindowWithSlidingStep(
(int) (1.5 * ITERATION_TIMES), (int) (1.5 * ITERATION_TIMES), 0);
testSlidingSizeWindowWithSlidingStep(
(int) (1.5 * ITERATION_TIMES),
(int) (1.5 * ITERATION_TIMES),
(int) (0.434 * ITERATION_TIMES));
testSlidingSizeWindowWithSlidingStep(
(int) (1.5 * ITERATION_TIMES),
(int) (1.5 * ITERATION_TIMES),
(int) (1.5 * ITERATION_TIMES));
}
private void testSlidingSizeWindowWithSlidingStep(
int windowSize, int slidingStep, int consumptionPoint) {
String sql =
String.format(
"select size_window_0(s1, '%s'='%s', '%s'='%s'), size_window_1(s1, '%s'='%s') from root.vehicle.d1",
"windowSize",
windowSize,
"slidingStep",
slidingStep,
"consumptionPoint",
consumptionPoint);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(3, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
if (ITERATION_TIMES < windowSize) {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(ITERATION_TIMES - (long) count * slidingStep, Integer.parseInt(actual));
++count;
}
} else if (count * slidingStep + windowSize < ITERATION_TIMES) {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(windowSize, Integer.parseInt(resultSet.getString(2)));
++count;
}
} else {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(
ITERATION_TIMES - (long) count * slidingStep,
Integer.parseInt(resultSet.getString(2)));
++count;
}
}
}
assertEquals((int) Math.ceil(ITERATION_TIMES / (double) slidingStep), count);
} catch (SQLException throwable) {
if (windowSize > 0) {
fail(throwable.getMessage());
}
}
sql =
String.format(
"select window_start_end(s1, '%s'='%s', '%s'='%s', '%s'='%s'), size_window_1(s1, '%s'='%s') from root.vehicle.d1",
UDFTestConstant.ACCESS_STRATEGY_KEY,
UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE,
UDFTestConstant.WINDOW_SIZE_KEY,
windowSize,
UDFTestConstant.SLIDING_STEP_KEY,
slidingStep,
"consumptionPoint",
consumptionPoint);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
assertEquals(3, resultSet.getMetaData().getColumnCount());
int count = 0;
while (resultSet.next()) {
if (ITERATION_TIMES < windowSize) {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1)));
assertEquals(
ITERATION_TIMES - (long) count * slidingStep - 1,
Integer.parseInt(actual) - Integer.parseInt(resultSet.getString(1)));
++count;
}
} else if (count * slidingStep + windowSize < ITERATION_TIMES) {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1)));
assertEquals(
windowSize - 1,
Integer.parseInt(resultSet.getString(2))
- Integer.parseInt(resultSet.getString(1)));
++count;
}
} else {
String actual = resultSet.getString(2);
if (actual != null) {
assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1)));
assertEquals(
ITERATION_TIMES - (long) count * slidingStep - 1,
Integer.parseInt(resultSet.getString(2))
- Integer.parseInt(resultSet.getString(1)));
++count;
}
}
}
assertEquals((int) Math.ceil(ITERATION_TIMES / (double) slidingStep), count);
} catch (SQLException throwable) {
if (windowSize > 0) {
fail(throwable.getMessage());
}
}
}
@Test
public void testSizeWindowUDFWithConstants() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
String query =
"SELECT accumulator(s1 + 1, 'access'='size', 'windowSize'='1000') FROM root.vehicle.d1";
try (ResultSet rs = statement.executeQuery(query)) {
int time = 0;
int value = 500500;
for (int i = 0; i < ITERATION_TIMES / 1000; i++) {
Assert.assertTrue(rs.next());
Assert.assertEquals(time, rs.getLong(1));
Assert.assertEquals(value, rs.getLong(2));
time += 1000;
value += 1000000;
}
Assert.assertFalse(rs.next());
}
query =
"SELECT 1 + accumulator(s1 + 1, 'access'='size', 'windowSize'='1000') FROM root.vehicle.d1";
try (ResultSet rs = statement.executeQuery(query)) {
int time = 0;
double value = 500501D;
for (int i = 0; i < ITERATION_TIMES / 1000; i++) {
Assert.assertTrue(rs.next());
Assert.assertEquals(time, rs.getLong(1));
Assert.assertEquals(value, rs.getDouble(2), 0.001);
time += 1000;
value += 1000000D;
}
Assert.assertFalse(rs.next());
}
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTimeWindowUDFWithConstants() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
String query =
"SELECT accumulator("
+ "s1 + 1, "
+ "'access'='time', "
+ "'timeInterval'='1000', "
+ "'slidingStep'='1000', "
+ "'displayWindowBegin'='0', "
+ "'displayWindowEnd'='10000') FROM root.vehicle.d1";
try (ResultSet rs = statement.executeQuery(query)) {
int time = 0;
int value = 500500;
for (int i = 0; i < 10; i++) {
Assert.assertTrue(rs.next());
Assert.assertEquals(time, rs.getLong(1));
Assert.assertEquals(value, rs.getLong(2));
time += 1000;
value += 1000000;
}
Assert.assertFalse(rs.next());
}
query =
"SELECT 1 + accumulator("
+ "s1 + 1, "
+ "'access'='time', "
+ "'timeInterval'='1000', "
+ "'slidingStep'='1000', "
+ "'displayWindowBegin'='0', "
+ "'displayWindowEnd'='10000') FROM root.vehicle.d1";
try (ResultSet rs = statement.executeQuery(query)) {
int time = 0;
double value = 500501;
for (int i = 0; i < 10; i++) {
Assert.assertTrue(rs.next());
Assert.assertEquals(time, rs.getLong(1));
Assert.assertEquals(value, rs.getDouble(2), 0.001D);
time += 1000;
value += 1000000D;
}
Assert.assertFalse(rs.next());
}
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}