| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.it.udf; |
| |
| import org.apache.iotdb.it.env.ConfigFactory; |
| import org.apache.iotdb.it.env.EnvFactory; |
| import org.apache.iotdb.it.framework.IoTDBTestRunner; |
| import org.apache.iotdb.itbase.category.ClusterIT; |
| import org.apache.iotdb.itbase.category.LocalStandaloneIT; |
| import org.apache.iotdb.itbase.constant.UDFTestConstant; |
| |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| |
| import java.sql.Connection; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| @RunWith(IoTDBTestRunner.class) |
| @Category({LocalStandaloneIT.class, ClusterIT.class}) |
| public class IoTDBUDFWindowQueryIT { |
| |
| protected static final int ITERATION_TIMES = 10000; |
| |
| protected static boolean enableSeqSpaceCompaction; |
| protected static boolean enableUnseqSpaceCompaction; |
| protected static boolean enableCrossSpaceCompaction; |
| |
| @BeforeClass |
| public static void setUp() throws Exception { |
| enableSeqSpaceCompaction = ConfigFactory.getConfig().isEnableSeqSpaceCompaction(); |
| enableUnseqSpaceCompaction = ConfigFactory.getConfig().isEnableUnseqSpaceCompaction(); |
| enableCrossSpaceCompaction = ConfigFactory.getConfig().isEnableCrossSpaceCompaction(); |
| ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false); |
| ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false); |
| ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false); |
| ConfigFactory.getConfig() |
| .setUdfCollectorMemoryBudgetInMB(5) |
| .setUdfTransformerMemoryBudgetInMB(5) |
| .setUdfReaderMemoryBudgetInMB(5); |
| EnvFactory.getEnv().initBeforeClass(); |
| createTimeSeries(); |
| generateData(); |
| registerUDF(); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| EnvFactory.getEnv().cleanAfterClass(); |
| ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction); |
| ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction); |
| ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction); |
| ConfigFactory.getConfig() |
| .setUdfCollectorMemoryBudgetInMB(100) |
| .setUdfTransformerMemoryBudgetInMB(100) |
| .setUdfReaderMemoryBudgetInMB(100); |
| } |
| |
| private static void createTimeSeries() { |
| try (Connection connection = EnvFactory.getEnv().getConnection(); |
| Statement statement = connection.createStatement()) { |
| statement.execute("SET STORAGE GROUP TO root.vehicle"); |
| statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN"); |
| statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN"); |
| } catch (SQLException throwable) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| private static void generateData() { |
| try (Connection connection = EnvFactory.getEnv().getConnection(); |
| Statement statement = connection.createStatement()) { |
| for (int i = 0; i < ITERATION_TIMES; ++i) { |
| statement.execute( |
| (String.format( |
| "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i))); |
| } |
| } catch (SQLException throwable) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| private static void registerUDF() { |
| try (Connection connection = EnvFactory.getEnv().getConnection(); |
| Statement statement = connection.createStatement()) { |
| statement.execute( |
| "create function counter as 'org.apache.iotdb.db.query.udf.example.Counter'"); |
| statement.execute( |
| "create function accumulator as 'org.apache.iotdb.db.query.udf.example.Accumulator'"); |
| statement.execute( |
| "create function time_window_tester as 'org.apache.iotdb.db.query.udf.example.SlidingTimeWindowConstructionTester'"); |
| statement.execute( |
| "create function size_window_0 as 'org.apache.iotdb.db.query.udf.example.SlidingSizeWindowConstructorTester0'"); |
| statement.execute( |
| "create function size_window_1 as 'org.apache.iotdb.db.query.udf.example.SlidingSizeWindowConstructorTester1'"); |
| statement.execute( |
| "create function window_start_end as 'org.apache.iotdb.db.query.udf.example.WindowStartEnd'"); |
| } catch (SQLException throwable) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testRowByRow() { |
| String sql = |
| String.format( |
| "select counter(s1, '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, UDFTestConstant.ACCESS_STRATEGY_ROW_BY_ROW); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| int count = 0; |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| while (resultSet.next()) { |
| assertEquals(count++, (int) (Double.parseDouble(resultSet.getString(1)))); |
| } |
| assertEquals(ITERATION_TIMES, count); |
| } catch (SQLException throwable) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testSlidingSizeWindow1() { |
| testSlidingSizeWindow((int) (0.1 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindow2() { |
| testSlidingSizeWindow((int) (0.033 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindow3() { |
| testSlidingSizeWindow((int) (0.333 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindow4() { |
| testSlidingSizeWindow((int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindow5() { |
| testSlidingSizeWindow(ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingSizeWindow6() { |
| testSlidingSizeWindow(3 * ITERATION_TIMES); |
| } |
| |
| // todo: remove ignore when exception handler in IT finishes |
| @Test |
| @Ignore |
| public void testSlidingSizeWindow7() { |
| testSlidingSizeWindow(0); |
| } |
| |
| // todo: remove ignore when fixed |
| @Test |
| @Ignore |
| public void testSlidingSizeWindow8() { |
| testSlidingSizeWindow(-ITERATION_TIMES); |
| } |
| |
| private void testSlidingSizeWindow(int windowSize) { |
| String sql = |
| String.format( |
| "select accumulator(s1, '%s'='%s', '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, |
| UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE, |
| UDFTestConstant.WINDOW_SIZE_KEY, |
| windowSize); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int expectedWindowSize = |
| (count < ITERATION_TIMES / windowSize) |
| ? windowSize |
| : ITERATION_TIMES - (ITERATION_TIMES / windowSize) * windowSize; |
| |
| int expectedAccumulation = 0; |
| for (int i = count * windowSize; i < count * windowSize + expectedWindowSize; ++i) { |
| expectedAccumulation += i; |
| } |
| |
| assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2)))); |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (0 < windowSize || !throwable.getMessage().contains(String.valueOf(windowSize))) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| sql = |
| String.format( |
| "select window_start_end(s1, '%s'='%s', '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, |
| UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE, |
| UDFTestConstant.WINDOW_SIZE_KEY, |
| windowSize); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int expectedWindowSize = |
| (count < ITERATION_TIMES / windowSize) |
| ? windowSize |
| : ITERATION_TIMES - (ITERATION_TIMES / windowSize) * windowSize; |
| |
| assertEquals(count * windowSize, (int) (Long.parseLong(resultSet.getString(1)))); |
| assertEquals( |
| expectedWindowSize - 1, |
| Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1))); |
| |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (0 < windowSize || !throwable.getMessage().contains(String.valueOf(windowSize))) { |
| fail(throwable.getMessage()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSlidingTimeWindow1() { |
| testSlidingTimeWindow( |
| (int) (0.33 * ITERATION_TIMES), |
| (int) (0.33 * ITERATION_TIMES), |
| (int) (0.33 * ITERATION_TIMES), |
| (int) (0.33 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow2() { |
| testSlidingTimeWindow( |
| (int) (0.033 * ITERATION_TIMES), |
| (int) (2 * 0.033 * ITERATION_TIMES), |
| ITERATION_TIMES / 2, |
| ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow3() { |
| testSlidingTimeWindow( |
| (int) (2 * 0.033 * ITERATION_TIMES), |
| (int) (0.033 * ITERATION_TIMES), |
| ITERATION_TIMES / 2, |
| ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow4() { |
| testSlidingTimeWindow( |
| (int) (0.033 * ITERATION_TIMES), |
| (int) (0.033 * ITERATION_TIMES), |
| ITERATION_TIMES / 2, |
| ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow5() { |
| testSlidingTimeWindow(ITERATION_TIMES, ITERATION_TIMES, 0, ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow6() { |
| testSlidingTimeWindow( |
| (int) (1.01 * ITERATION_TIMES), (int) (0.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow7() { |
| testSlidingTimeWindow( |
| (int) (0.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| public void testSlidingTimeWindow8() { |
| testSlidingTimeWindow( |
| (int) (1.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindow9() { |
| testSlidingTimeWindow( |
| (int) (0.01 * ITERATION_TIMES), (int) (0.05 * ITERATION_TIMES), ITERATION_TIMES / 2, 0); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindow10() { |
| testSlidingTimeWindow( |
| (int) (-0.01 * ITERATION_TIMES), (int) (0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindow11() { |
| testSlidingTimeWindow( |
| (int) (0.01 * ITERATION_TIMES), (int) (-0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindow12() { |
| testSlidingTimeWindow((int) (0.01 * ITERATION_TIMES), 0, 0, ITERATION_TIMES / 2); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindow13() { |
| testSlidingTimeWindow(0, (int) (0.05 * ITERATION_TIMES), 0, ITERATION_TIMES / 2); |
| } |
| |
| private void testSlidingTimeWindow( |
| int timeInterval, int slidingStep, int displayWindowBegin, int displayWindowEnd) { |
| String sql = |
| String.format( |
| "select accumulator(s1, s1, s1, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, |
| UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME, |
| UDFTestConstant.TIME_INTERVAL_KEY, |
| timeInterval, |
| UDFTestConstant.SLIDING_STEP_KEY, |
| slidingStep, |
| UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY, |
| displayWindowBegin, |
| UDFTestConstant.DISPLAY_WINDOW_END_KEY, |
| displayWindowEnd); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int begin = displayWindowBegin + count * slidingStep; |
| int expectedWindowSize = |
| begin + timeInterval < displayWindowEnd ? timeInterval : displayWindowEnd - begin; |
| |
| int expectedAccumulation = 0; |
| for (int i = displayWindowBegin + count * slidingStep; |
| i < displayWindowBegin + count * slidingStep + expectedWindowSize; |
| ++i) { |
| expectedAccumulation += i; |
| } |
| |
| assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2)))); |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (slidingStep > 0 && timeInterval > 0 && displayWindowEnd >= displayWindowBegin) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| sql = |
| String.format( |
| "select window_start_end(s1, s1, s1, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, |
| UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME, |
| UDFTestConstant.TIME_INTERVAL_KEY, |
| timeInterval, |
| UDFTestConstant.SLIDING_STEP_KEY, |
| slidingStep, |
| UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY, |
| displayWindowBegin, |
| UDFTestConstant.DISPLAY_WINDOW_END_KEY, |
| displayWindowEnd); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int begin = displayWindowBegin + count * slidingStep; |
| |
| assertEquals(begin, (int) (Long.parseLong(resultSet.getString(1)))); |
| assertEquals( |
| timeInterval - 1, |
| Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1))); |
| |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (slidingStep > 0 && timeInterval > 0 && displayWindowEnd >= displayWindowBegin) { |
| fail(throwable.getMessage()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSlidingTimeWindowWithTimeIntervalOnly1() { |
| testSlidingTimeWindowWithTimeIntervalOnly(1); |
| } |
| |
| @Test |
| public void testSlidingTimeWindowWithTimeIntervalOnly2() { |
| testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES / 10); |
| } |
| |
| @Test |
| public void testSlidingTimeWindowWithTimeIntervalOnly3() { |
| testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES / 33); |
| } |
| |
| @Test |
| public void testSlidingTimeWindowWithTimeIntervalOnly4() { |
| testSlidingTimeWindowWithTimeIntervalOnly(ITERATION_TIMES); |
| } |
| |
| @Test |
| public void testSlidingTimeWindowWithTimeIntervalOnly5() { |
| testSlidingTimeWindowWithTimeIntervalOnly(2 * ITERATION_TIMES); |
| } |
| |
| @Test |
| @Ignore |
| public void testSlidingTimeWindowWithTimeIntervalOnly6() { |
| testSlidingTimeWindowWithTimeIntervalOnly(-ITERATION_TIMES); |
| } |
| |
| private void testSlidingTimeWindowWithTimeIntervalOnly(int timeInterval) { |
| String sql = |
| String.format( |
| "select time_window_tester(s1, '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.TIME_INTERVAL_KEY, timeInterval); |
| |
| int displayWindowBegin = 0; |
| int displayWindowEnd = ITERATION_TIMES; |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int begin = displayWindowBegin + count * timeInterval; |
| int expectedWindowSize = |
| begin + timeInterval < displayWindowEnd ? timeInterval : displayWindowEnd - begin; |
| |
| int expectedAccumulation = 0; |
| for (int i = displayWindowBegin + count * timeInterval; |
| i < displayWindowBegin + count * timeInterval + expectedWindowSize; |
| ++i) { |
| expectedAccumulation += i; |
| } |
| |
| assertEquals(expectedAccumulation, (int) (Double.parseDouble(resultSet.getString(2)))); |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (timeInterval > 0) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| sql = |
| String.format( |
| "select window_start_end(s1, '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.TIME_INTERVAL_KEY, timeInterval); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(2, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| int begin = displayWindowBegin + count * timeInterval; |
| |
| assertEquals(begin, (int) (Long.parseLong(resultSet.getString(1)))); |
| assertEquals( |
| timeInterval - 1, |
| Long.parseLong(resultSet.getString(2)) - Long.parseLong(resultSet.getString(1))); |
| |
| ++count; |
| } |
| } catch (SQLException throwable) { |
| if (timeInterval > 0) { |
| fail(throwable.getMessage()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep1() { |
| testSlidingSizeWindowWithSlidingStep(1, 1, 0); |
| testSlidingSizeWindowWithSlidingStep(1, 1, 1); |
| testSlidingSizeWindowWithSlidingStep(1, 1, 10); |
| testSlidingSizeWindowWithSlidingStep(1, 1, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(1, 1, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep2() { |
| testSlidingSizeWindowWithSlidingStep(100, 100, 0); |
| testSlidingSizeWindowWithSlidingStep(100, 100, 100); |
| testSlidingSizeWindowWithSlidingStep(100, 100, 10000); |
| testSlidingSizeWindowWithSlidingStep(100, 100, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(100, 100, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep3() { |
| testSlidingSizeWindowWithSlidingStep(111, 123, 0); |
| testSlidingSizeWindowWithSlidingStep(111, 123, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(111, 123, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep4() { |
| testSlidingSizeWindowWithSlidingStep(123, 111, 0); |
| testSlidingSizeWindowWithSlidingStep(123, 111, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(123, 111, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep5() { |
| testSlidingSizeWindowWithSlidingStep(100, 10000, 0); |
| testSlidingSizeWindowWithSlidingStep(100, 10000, 100); |
| testSlidingSizeWindowWithSlidingStep(100, 10000, 10000); |
| testSlidingSizeWindowWithSlidingStep(100, 10000, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(100, 10000, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep6() { |
| testSlidingSizeWindowWithSlidingStep(10000, 1000, 0); |
| testSlidingSizeWindowWithSlidingStep(10000, 1000, 1000); |
| testSlidingSizeWindowWithSlidingStep(10000, 1000, 10000); |
| testSlidingSizeWindowWithSlidingStep(10000, 1000, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep(10000, 1000, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep7() { |
| testSlidingSizeWindowWithSlidingStep((int) (1.5 * ITERATION_TIMES), 4333, 0); |
| testSlidingSizeWindowWithSlidingStep( |
| (int) (1.5 * ITERATION_TIMES), 4333, (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep( |
| (int) (1.5 * ITERATION_TIMES), 4333, (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep8() { |
| testSlidingSizeWindowWithSlidingStep(10000, (int) (1.5 * ITERATION_TIMES), 0); |
| testSlidingSizeWindowWithSlidingStep( |
| 10000, (int) (1.5 * ITERATION_TIMES), (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep( |
| 10000, (int) (1.5 * ITERATION_TIMES), (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| @Test |
| public void testSlidingSizeWindowWithSlidingStep9() { |
| testSlidingSizeWindowWithSlidingStep( |
| (int) (1.5 * ITERATION_TIMES), (int) (1.5 * ITERATION_TIMES), 0); |
| testSlidingSizeWindowWithSlidingStep( |
| (int) (1.5 * ITERATION_TIMES), |
| (int) (1.5 * ITERATION_TIMES), |
| (int) (0.434 * ITERATION_TIMES)); |
| testSlidingSizeWindowWithSlidingStep( |
| (int) (1.5 * ITERATION_TIMES), |
| (int) (1.5 * ITERATION_TIMES), |
| (int) (1.5 * ITERATION_TIMES)); |
| } |
| |
| private void testSlidingSizeWindowWithSlidingStep( |
| int windowSize, int slidingStep, int consumptionPoint) { |
| String sql = |
| String.format( |
| "select size_window_0(s1, '%s'='%s', '%s'='%s'), size_window_1(s1, '%s'='%s') from root.vehicle.d1", |
| "windowSize", |
| windowSize, |
| "slidingStep", |
| slidingStep, |
| "consumptionPoint", |
| consumptionPoint); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(3, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| if (ITERATION_TIMES < windowSize) { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals(ITERATION_TIMES - (long) count * slidingStep, Integer.parseInt(actual)); |
| ++count; |
| } |
| } else if (count * slidingStep + windowSize < ITERATION_TIMES) { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals(windowSize, Integer.parseInt(resultSet.getString(2))); |
| ++count; |
| } |
| } else { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals( |
| ITERATION_TIMES - (long) count * slidingStep, |
| Integer.parseInt(resultSet.getString(2))); |
| ++count; |
| } |
| } |
| } |
| assertEquals((int) Math.ceil(ITERATION_TIMES / (double) slidingStep), count); |
| } catch (SQLException throwable) { |
| if (windowSize > 0) { |
| fail(throwable.getMessage()); |
| } |
| } |
| |
| sql = |
| String.format( |
| "select window_start_end(s1, '%s'='%s', '%s'='%s', '%s'='%s'), size_window_1(s1, '%s'='%s') from root.vehicle.d1", |
| UDFTestConstant.ACCESS_STRATEGY_KEY, |
| UDFTestConstant.ACCESS_STRATEGY_SLIDING_SIZE, |
| UDFTestConstant.WINDOW_SIZE_KEY, |
| windowSize, |
| UDFTestConstant.SLIDING_STEP_KEY, |
| slidingStep, |
| "consumptionPoint", |
| consumptionPoint); |
| |
| try (Connection conn = EnvFactory.getEnv().getConnection(); |
| Statement statement = conn.createStatement(); |
| ResultSet resultSet = statement.executeQuery(sql)) { |
| assertEquals(3, resultSet.getMetaData().getColumnCount()); |
| |
| int count = 0; |
| while (resultSet.next()) { |
| |
| if (ITERATION_TIMES < windowSize) { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1))); |
| assertEquals( |
| ITERATION_TIMES - (long) count * slidingStep - 1, |
| Integer.parseInt(actual) - Integer.parseInt(resultSet.getString(1))); |
| ++count; |
| } |
| } else if (count * slidingStep + windowSize < ITERATION_TIMES) { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1))); |
| assertEquals( |
| windowSize - 1, |
| Integer.parseInt(resultSet.getString(2)) |
| - Integer.parseInt(resultSet.getString(1))); |
| ++count; |
| } |
| } else { |
| String actual = resultSet.getString(2); |
| if (actual != null) { |
| assertEquals(count * slidingStep, Integer.parseInt(resultSet.getString(1))); |
| assertEquals( |
| ITERATION_TIMES - (long) count * slidingStep - 1, |
| Integer.parseInt(resultSet.getString(2)) |
| - Integer.parseInt(resultSet.getString(1))); |
| ++count; |
| } |
| } |
| } |
| assertEquals((int) Math.ceil(ITERATION_TIMES / (double) slidingStep), count); |
| } catch (SQLException throwable) { |
| if (windowSize > 0) { |
| fail(throwable.getMessage()); |
| } |
| } |
| } |
| |
| @Test |
| public void testSizeWindowUDFWithConstants() { |
| try (Connection connection = EnvFactory.getEnv().getConnection(); |
| Statement statement = connection.createStatement()) { |
| |
| String query = |
| "SELECT accumulator(s1 + 1, 'access'='size', 'windowSize'='1000') FROM root.vehicle.d1"; |
| try (ResultSet rs = statement.executeQuery(query)) { |
| int time = 0; |
| int value = 500500; |
| for (int i = 0; i < ITERATION_TIMES / 1000; i++) { |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(time, rs.getLong(1)); |
| Assert.assertEquals(value, rs.getLong(2)); |
| time += 1000; |
| value += 1000000; |
| } |
| Assert.assertFalse(rs.next()); |
| } |
| |
| query = |
| "SELECT 1 + accumulator(s1 + 1, 'access'='size', 'windowSize'='1000') FROM root.vehicle.d1"; |
| try (ResultSet rs = statement.executeQuery(query)) { |
| int time = 0; |
| double value = 500501D; |
| for (int i = 0; i < ITERATION_TIMES / 1000; i++) { |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(time, rs.getLong(1)); |
| Assert.assertEquals(value, rs.getDouble(2), 0.001); |
| time += 1000; |
| value += 1000000D; |
| } |
| Assert.assertFalse(rs.next()); |
| } |
| } catch (SQLException e) { |
| e.printStackTrace(); |
| fail(e.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testTimeWindowUDFWithConstants() { |
| try (Connection connection = EnvFactory.getEnv().getConnection(); |
| Statement statement = connection.createStatement()) { |
| String query = |
| "SELECT accumulator(" |
| + "s1 + 1, " |
| + "'access'='time', " |
| + "'timeInterval'='1000', " |
| + "'slidingStep'='1000', " |
| + "'displayWindowBegin'='0', " |
| + "'displayWindowEnd'='10000') FROM root.vehicle.d1"; |
| try (ResultSet rs = statement.executeQuery(query)) { |
| int time = 0; |
| int value = 500500; |
| for (int i = 0; i < 10; i++) { |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(time, rs.getLong(1)); |
| Assert.assertEquals(value, rs.getLong(2)); |
| time += 1000; |
| value += 1000000; |
| } |
| Assert.assertFalse(rs.next()); |
| } |
| |
| query = |
| "SELECT 1 + accumulator(" |
| + "s1 + 1, " |
| + "'access'='time', " |
| + "'timeInterval'='1000', " |
| + "'slidingStep'='1000', " |
| + "'displayWindowBegin'='0', " |
| + "'displayWindowEnd'='10000') FROM root.vehicle.d1"; |
| try (ResultSet rs = statement.executeQuery(query)) { |
| int time = 0; |
| double value = 500501; |
| for (int i = 0; i < 10; i++) { |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(time, rs.getLong(1)); |
| Assert.assertEquals(value, rs.getDouble(2), 0.001D); |
| time += 1000; |
| value += 1000000D; |
| } |
| Assert.assertFalse(rs.next()); |
| } |
| } catch (SQLException e) { |
| e.printStackTrace(); |
| fail(e.getMessage()); |
| } |
| } |
| } |