| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end; |
| |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_VIEW_INDEX; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_OFFSET; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ORDERED_GROUP_BY; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ORDER_BY_NON_PK; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_UNORDERED_GROUP_BY; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.INDEX_REBUILD_ASYNC; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DATA; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_OFFSET; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ORDERED_GROUP_BY; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ORDER_BY_NON_PK; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_UNORDERED_GROUP_BY; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_VIEW_INDEX; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.assertExpectedOutput; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.checkForPreConditions; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.computeClientVersions; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.executeQueriesWithCurrentVersion; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.executeQueryWithClientVersion; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradeProps.NONE; |
| import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradeProps.SET_MAX_LOOK_BACK_AGE; |
| import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.util.Collection; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.phoenix.coprocessor.SystemCatalogRegionObserver; |
| import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint; |
| import org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.MavenCoordinates; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.jdbc.PhoenixDriver; |
| import org.apache.phoenix.query.BaseTest; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.SystemTaskSplitPolicy; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.ServerUtil.ConnectionFactory; |
| import org.junit.After; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class is meant for testing all compatible client versions |
| * against the current server version. It runs SQL queries with given |
| * client versions and compares the output against gold files |
| */ |
| |
| @RunWith(Parameterized.class) |
| @Category(NeedsOwnMiniClusterTest.class) |
| public class BackwardCompatibilityIT { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger( |
| BackwardCompatibilityIT.class); |
| |
| private final MavenCoordinates compatibleClientVersion; |
| private static Configuration conf; |
| private static HBaseTestingUtility hbaseTestUtil; |
| private static String zkQuorum; |
| private static String url; |
| private String tmpDir; |
| |
| public BackwardCompatibilityIT(MavenCoordinates compatibleClientVersion) { |
| this.compatibleClientVersion = compatibleClientVersion; |
| } |
| |
| @Parameters(name = "BackwardCompatibilityIT_compatibleClientVersion={0}") |
| public static synchronized Collection<MavenCoordinates> data() throws Exception { |
| return computeClientVersions(); |
| } |
| |
| @Before |
| public synchronized void doSetup() throws Exception { |
| tmpDir = System.getProperty("java.io.tmpdir"); |
| conf = HBaseConfiguration.create(); |
| conf.set(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, |
| Long.toString(Long.MAX_VALUE)); |
| conf.set(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, |
| Long.toString(Long.MAX_VALUE)); |
| hbaseTestUtil = new HBaseTestingUtility(conf); |
| setUpConfigForMiniCluster(conf); |
| conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); |
| hbaseTestUtil.startMiniCluster(); |
| zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); |
| url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; |
| DriverManager.registerDriver(PhoenixDriver.INSTANCE); |
| checkForPreConditions(compatibleClientVersion, conf); |
| } |
| |
| @After |
| public synchronized void cleanUpAfterTest() throws Exception { |
| boolean refCountLeaked = BaseTest.isAnyStoreRefCountLeaked(hbaseTestUtil.getAdmin()); |
| ConnectionFactory.shutdown(); |
| try { |
| DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); |
| } finally { |
| hbaseTestUtil.shutdownMiniCluster(); |
| ServerMetadataCacheTestImpl.resetCache(); |
| } |
| System.setProperty("java.io.tmpdir", tmpDir); |
| assertFalse("refCount leaked", refCountLeaked); |
| } |
| |
| /** |
| * Scenario: |
| * 1. Old Client connects to the updated server |
| * 2. Old Client creates tables and inserts data |
| * 3. New Client reads the data inserted by the old client |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testUpsertWithOldClient() throws Exception { |
| // Insert data with old client and read with new client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ADD, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_ADD, url, NONE); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithOldClientReadFromNewClient() throws Exception { |
| // Create a base table, view and make it diverge from an old client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithOldClientReadWithMaxLookBackAge() |
| throws Exception { |
| // Create a base table, view and make it diverge from an old client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, SET_MAX_LOOK_BACK_AGE); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithOldClientReadFromOldClient() throws Exception { |
| // Create a base table, view and make it diverge from an old client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_DIVERGED_VIEW, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithOldClientReadFromOldClientAfterUpgrade() |
| throws Exception { |
| // Create a base table, view and make it diverge from an old client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum); |
| try (Connection conn = DriverManager.getConnection(url)) { |
| // Just connect with a new client to cause a metadata upgrade |
| } |
| // Query with an old client again |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_DIVERGED_VIEW, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithNewClientReadFromOldClient() throws Exception { |
| executeQueriesWithCurrentVersion(CREATE_DIVERGED_VIEW, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_DIVERGED_VIEW, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| @Test |
| public void testCreateDivergedViewWithNewClientReadFromNewClient() throws Exception { |
| executeQueriesWithCurrentVersion(CREATE_DIVERGED_VIEW, url, NONE); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| } |
| |
| /** |
| * Scenario: |
| * 1. New Client connects to the updated server |
| * 2. New Client creates tables and inserts data |
| * 3. Old Client reads the data inserted by the new client |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testSelectWithOldClient() throws Exception { |
| // Insert data with new client and read with old client |
| executeQueriesWithCurrentVersion(CREATE_ADD, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_ADD, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| } |
| |
| /** |
| * Scenario: |
| * 1. Old Client connects to the updated server |
| * 2. Old Client creates tables and inserts data |
| * 3. New Client reads the data inserted by the old client |
| * 4. New Client inserts more data into the tables created by old client |
| * 5. Old Client reads the data inserted by new client |
| * Use phoenix.max.lookback.age.seconds config and ensure that upgrade |
| * is not impacted by the config. |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testSelectUpsertWithNewClientWithMaxLookBackAge() throws Exception { |
| // Insert data with old client and read with new client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ADD, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_ADD, url, SET_MAX_LOOK_BACK_AGE); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| |
| // Insert more data with new client and read with old client |
| executeQueriesWithCurrentVersion(ADD_DATA, url, SET_MAX_LOOK_BACK_AGE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_ADD_DATA, zkQuorum); |
| assertExpectedOutput(QUERY_ADD_DATA); |
| } |
| |
| /** |
| * Scenario: |
| * 1. Old Client connects to the updated server |
| * 2. Old Client creates tables and inserts data |
| * 3. New Client reads the data inserted by the old client |
| * 4. New Client inserts more data into the tables created by old client |
| * 5. Old Client reads the data inserted by new client |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testSelectUpsertWithNewClient() throws Exception { |
| // Insert data with old client and read with new client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ADD, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_ADD, url, NONE); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| |
| // Insert more data with new client and read with old client |
| executeQueriesWithCurrentVersion(ADD_DATA, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_ADD_DATA, zkQuorum); |
| assertExpectedOutput(QUERY_ADD_DATA); |
| } |
| |
| /** |
| * Scenario: |
| * 1. New Client connects to the updated server |
| * 2. New Client creates tables and inserts data |
| * 3. Old Client reads the data inserted by the old client |
| * 4. Old Client inserts more data into the tables created by old client |
| * 5. New Client reads the data inserted by new client |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testSelectUpsertWithOldClient() throws Exception { |
| // Insert data with new client and read with old client |
| executeQueriesWithCurrentVersion(CREATE_ADD, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_ADD, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| |
| // Insert more data with old client and read with new client |
| executeQueryWithClientVersion(compatibleClientVersion, ADD_DATA, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_ADD_DATA, url, NONE); |
| assertExpectedOutput(QUERY_ADD_DATA); |
| } |
| |
| /** |
| * Scenario: |
| * 1. Old Client connects to the updated server |
| * 2. Old Client creates tables and inserts data |
| * 3. New Client reads the data inserted by the old client |
| * 4. Old Client creates and deletes the data |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testUpsertDeleteWithOldClient() throws Exception { |
| // Insert data with old client and read with new client |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ADD, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_CREATE_ADD, url, NONE); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| |
| // Deletes with the old client |
| executeQueryWithClientVersion(compatibleClientVersion, ADD_DELETE, zkQuorum); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_ADD_DELETE, zkQuorum); |
| assertExpectedOutput(QUERY_ADD_DELETE); |
| } |
| |
| /** |
| * Scenario: |
| * 1. New Client connects to the updated server |
| * 2. New Client creates tables and inserts data |
| * 3. Old Client reads the data inserted by the old client |
| * 4. New Client creates and deletes the data |
| * |
| * @throws Exception thrown if any errors encountered during query execution or file IO |
| */ |
| @Test |
| public void testUpsertDeleteWithNewClient() throws Exception { |
| // Insert data with old client and read with new client |
| executeQueriesWithCurrentVersion(CREATE_ADD, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_CREATE_ADD, zkQuorum); |
| assertExpectedOutput(QUERY_CREATE_ADD); |
| |
| // Deletes with the new client |
| executeQueriesWithCurrentVersion(ADD_DELETE, url, NONE); |
| executeQueriesWithCurrentVersion(QUERY_ADD_DELETE, url, NONE); |
| assertExpectedOutput(QUERY_ADD_DELETE); |
| } |
| |
| @Test |
| public void testSplitPolicyAndCoprocessorForSysTask() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, |
| CREATE_DIVERGED_VIEW, zkQuorum); |
| |
| String[] versionArr = compatibleClientVersion.getVersion().split("\\."); |
| int majorVersion = Integer.parseInt(versionArr[0]); |
| int minorVersion = Integer.parseInt(versionArr[1]); |
| org.apache.hadoop.hbase.client.Connection conn = null; |
| Admin admin = null; |
| // if connected with client < 4.15, SYSTEM.TASK does not exist |
| // if connected with client 4.15, SYSTEM.TASK exists without any |
| // split policy and also TaskMetaDataEndpoint coprocessor would not |
| // exist |
| if (majorVersion == 4 && minorVersion == 15) { |
| conn = hbaseTestUtil.getConnection(); |
| admin = conn.getAdmin(); |
| TableDescriptor tableDescriptor = admin.getDescriptor( |
| TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); |
| assertNull("split policy should be null with compatible client version: " |
| + compatibleClientVersion, tableDescriptor.getRegionSplitPolicyClassName()); |
| assertFalse("Coprocessor " + TaskMetaDataEndpoint.class.getName() |
| + " should not have been added with compatible client version: " |
| + compatibleClientVersion, |
| tableDescriptor.hasCoprocessor(TaskMetaDataEndpoint.class.getName())); |
| } |
| |
| executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE); |
| |
| if (conn == null) { |
| conn = hbaseTestUtil.getConnection(); |
| admin = conn.getAdmin(); |
| } |
| // connect with client > 4.15, and we have new split policy and new |
| // coprocessor loaded |
| TableDescriptor tableDescriptor = admin.getDescriptor( |
| TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); |
| assertEquals("split policy not updated with compatible client version: " |
| + compatibleClientVersion, |
| tableDescriptor.getRegionSplitPolicyClassName(), |
| SystemTaskSplitPolicy.class.getName()); |
| assertTrue("Coprocessor " + TaskMetaDataEndpoint.class.getName() |
| + " has not been added with compatible client version: " |
| + compatibleClientVersion, tableDescriptor.hasCoprocessor( |
| TaskMetaDataEndpoint.class.getName())); |
| assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); |
| admin.close(); |
| } |
| |
| @Test |
| public void testSystemTaskCreationWithIndexAsyncRebuild() throws Exception { |
| String[] versionArr = compatibleClientVersion.getVersion().split("\\."); |
| int majorVersion = Integer.parseInt(versionArr[0]); |
| int minorVersion = Integer.parseInt(versionArr[1]); |
| // index async rebuild support min version check |
| if (majorVersion > 4 || (majorVersion == 4 && minorVersion >= 15)) { |
| executeQueryWithClientVersion(compatibleClientVersion, |
| INDEX_REBUILD_ASYNC, zkQuorum); |
| //Check if the task is added. |
| //It won't start because we set the task intervals to long.MAX_VALUE |
| int retryCount=0; |
| while (true) { |
| try { |
| executeQueriesWithCurrentVersion(QUERY_INDEX_REBUILD_ASYNC, url, NONE); |
| assertExpectedOutput(QUERY_INDEX_REBUILD_ASYNC); |
| break; |
| } catch (AssertionError e) { |
| if (retryCount++ > 10) { |
| throw e; |
| } |
| Thread.sleep(5000); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testViewIndexIdCreatedWithOldClient() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, ADD_VIEW_INDEX, zkQuorum); |
| org.apache.hadoop.hbase.client.Connection conn = hbaseTestUtil.getConnection(); |
| try (Admin admin = conn.getAdmin()) { |
| TableDescriptor tableDescriptor = admin.getDescriptor( |
| TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); |
| //The oldest client we test is 5.1.0, which already adds SystemCatalogRegionObserver |
| assertTrue("Coprocessor " + SystemCatalogRegionObserver.class.getName() |
| + " has been added with compatible client version: " |
| + compatibleClientVersion, tableDescriptor.hasCoprocessor( |
| SystemCatalogRegionObserver.class.getName())); |
| |
| executeQueriesWithCurrentVersion(QUERY_VIEW_INDEX, url, NONE); |
| assertExpectedOutput(QUERY_VIEW_INDEX); |
| |
| tableDescriptor = admin.getDescriptor( |
| TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); |
| assertTrue("Coprocessor " + SystemCatalogRegionObserver.class.getName() |
| + " has been added with compatible client version: " |
| + compatibleClientVersion, tableDescriptor.hasCoprocessor( |
| SystemCatalogRegionObserver.class.getName())); |
| } |
| } |
| |
| @Test |
| public void testViewIndexIdCreatedWithNewClient() throws Exception { |
| executeQueriesWithCurrentVersion(ADD_VIEW_INDEX, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_VIEW_INDEX, zkQuorum); |
| assertExpectedOutput(QUERY_VIEW_INDEX); |
| } |
| |
| @Test |
| public void testUnorderedGroupByAddDataByOldClientReadByNewClient() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_UNORDERED_GROUP_BY, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_UNORDERED_GROUP_BY, url, NONE); |
| assertExpectedOutput(QUERY_UNORDERED_GROUP_BY); |
| } |
| |
| @Test |
| public void testUnorderedGroupByAddDataByNewClientReadByOldClient() throws Exception { |
| executeQueriesWithCurrentVersion(CREATE_UNORDERED_GROUP_BY, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_UNORDERED_GROUP_BY, zkQuorum); |
| assertExpectedOutput(QUERY_UNORDERED_GROUP_BY); |
| } |
| |
| @Test |
| public void testOrderedGroupByAddDataByOldClientReadByNewClient() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ORDERED_GROUP_BY, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_ORDERED_GROUP_BY, url, NONE); |
| assertExpectedOutput(QUERY_ORDERED_GROUP_BY); |
| } |
| |
| @Test |
| public void testOrderedGroupByAddDataByNewClientReadByOldClient() throws Exception { |
| Assume.assumeTrue("compatible client version should be >= 5.1.3", |
| isClientCompatibleForOrderedGroupByQuery()); |
| executeQueriesWithCurrentVersion(CREATE_ORDERED_GROUP_BY, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_ORDERED_GROUP_BY, zkQuorum); |
| assertExpectedOutput(QUERY_ORDERED_GROUP_BY); |
| } |
| |
| @Test |
| public void testOffsetAddDataByOldClientReadByNewClient() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_OFFSET, zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_OFFSET, url, NONE); |
| assertExpectedOutput(QUERY_OFFSET); |
| } |
| |
| @Test |
| public void testOffsetAddDataByNewClientReadByOldClient() throws Exception { |
| executeQueriesWithCurrentVersion(CREATE_OFFSET, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_OFFSET, zkQuorum); |
| assertExpectedOutput(QUERY_OFFSET); |
| } |
| |
| @Test |
| public void testOrderByNonPkAddDataByOldClientReadByNewClient() throws Exception { |
| executeQueryWithClientVersion(compatibleClientVersion, CREATE_ORDER_BY_NON_PK, |
| zkQuorum); |
| executeQueriesWithCurrentVersion(QUERY_ORDER_BY_NON_PK, url, NONE); |
| assertExpectedOutput(QUERY_ORDER_BY_NON_PK); |
| } |
| |
| @Test |
| public void testOrderByNonPkAddDataByNewClientReadByOldClient() throws Exception { |
| executeQueriesWithCurrentVersion(CREATE_ORDER_BY_NON_PK, url, NONE); |
| executeQueryWithClientVersion(compatibleClientVersion, QUERY_ORDER_BY_NON_PK, |
| zkQuorum); |
| assertExpectedOutput(QUERY_ORDER_BY_NON_PK); |
| } |
| |
| private boolean isClientCompatibleForOrderedGroupByQuery() { |
| String[] clientVersion = compatibleClientVersion.getVersion().split("\\."); |
| int majorVersion = Integer.parseInt(clientVersion[0]); |
| int minorVersion = Integer.parseInt(clientVersion[1]); |
| int patchVersion = Integer.parseInt(clientVersion[2]); |
| if (majorVersion > 5) { |
| return true; |
| } |
| if (majorVersion < 5) { |
| return false; |
| } |
| if (minorVersion > 1) { |
| return true; |
| } |
| if (minorVersion < 1) { |
| return false; |
| } |
| return patchVersion >= 3; |
| } |
| |
| } |