| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.connectors.jdbc; |
| |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.DriverManager; |
| import java.sql.JDBCType; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.List; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.asyncqueue.AsyncEventQueue; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService; |
| import org.apache.geode.connectors.jdbc.internal.cli.MappingCommandUtils; |
| import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping; |
| import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping; |
| import org.apache.geode.pdx.FieldType; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.pdx.ReflectionBasedAutoSerializer; |
| import org.apache.geode.pdx.internal.AutoSerializableManager; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.SerializableConsumerIF; |
| import org.apache.geode.test.dunit.rules.ClientVM; |
| import org.apache.geode.test.dunit.rules.ClusterStartupRule; |
| import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; |
| import org.apache.geode.test.dunit.rules.MemberVM; |
| import org.apache.geode.test.junit.rules.GfshCommandRule; |
| import org.apache.geode.test.junit.rules.serializable.SerializableTestName; |
| |
| /** |
| * End-to-end dunits for JDBC connector |
| */ |
| public abstract class JdbcDistributedTest implements Serializable { |
| |
| static final String DB_NAME = "test"; |
| private static final String TABLE_NAME = "employees"; |
| private static final String REGION_NAME = "employees"; |
| private static final String DATA_SOURCE_NAME = "TestDataSource"; |
| |
| @Rule |
| public transient GfshCommandRule gfsh = new GfshCommandRule(); |
| |
| @Rule |
| public transient ClusterStartupRule startupRule = new ClusterStartupRule(); |
| |
| @Rule |
| public SerializableTestName testName = new SerializableTestName(); |
| |
| @Rule |
| public DistributedRestoreSystemProperties restoreSystemProperties = |
| new DistributedRestoreSystemProperties(); |
| |
| private MemberVM dataserver; |
| private MemberVM locator; |
| private String connectionUrl; |
| |
| @Before |
| public void setup() throws Exception { |
| locator = startupRule.startLocatorVM(0); |
| gfsh.connectAndVerify(locator); |
| connectionUrl = getConnectionUrl(); |
| } |
| |
| public abstract Connection getConnection() throws SQLException; |
| |
| public abstract String getConnectionUrl() throws IOException, InterruptedException; |
| |
| private void createTable() throws SQLException { |
| dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort())); |
| Connection connection = getConnection(); |
| Statement statement = connection.createStatement(); |
| statement.execute("Create Table " + TABLE_NAME |
| + " (id varchar(10) primary key not null, name varchar(10), age int not null)"); |
| } |
| |
| private MemberVM createTableForGroup(int idx, String groupName) throws SQLException { |
| MemberVM server = startupRule.startServerVM(idx, groupName, locator.getPort()); |
| Connection connection = getConnection(); |
| Statement statement = connection.createStatement(); |
| statement.execute("Create Table " + TABLE_NAME |
| + " (id varchar(10) primary key not null, name varchar(10), age int not null)"); |
| return server; |
| } |
| |
| private MemberVM addServerForGroup(int idx, String groupName) throws SQLException { |
| MemberVM server = startupRule.startServerVM(idx, groupName, locator.getPort()); |
| return server; |
| } |
| |
| private void alterTable() throws SQLException { |
| Connection connection = getConnection(); |
| Statement statement = connection.createStatement(); |
| statement.execute("Alter Table " + TABLE_NAME |
| + " add column new_column varchar(10)"); |
| } |
| |
| private void createTableForAllSupportedFields() throws SQLException { |
| dataserver = startupRule.startServerVM(1, |
| x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized()); |
| Connection connection = getConnection(); |
| DatabaseMetaData metaData = connection.getMetaData(); |
| String quote = metaData.getIdentifierQuoteString(); |
| Statement statement = connection.createStatement(); |
| createSupportedFieldsTable(statement, TABLE_NAME, quote); |
| } |
| |
| protected abstract void createSupportedFieldsTable(Statement statement, String tableName, |
| String quote) throws SQLException; |
| |
| private void insertNullDataForAllSupportedFieldsTable(String key) throws SQLException { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| DatabaseMetaData metaData = connection.getMetaData(); |
| String quote = metaData.getIdentifierQuoteString(); |
| |
| String insertQuery = |
| "Insert into " + quote + TABLE_NAME + quote + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)"; |
| System.out.println("### Query is :" + insertQuery); |
| PreparedStatement statement = connection.prepareStatement(insertQuery); |
| createNullStatement(key, statement); |
| |
| statement.execute(); |
| } |
| |
| protected abstract void createNullStatement(String key, PreparedStatement statement) |
| throws SQLException; |
| |
| private void insertDataForAllSupportedFieldsTable(String key, ClassWithSupportedPdxFields data) |
| throws SQLException { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| DatabaseMetaData metaData = connection.getMetaData(); |
| String quote = metaData.getIdentifierQuoteString(); |
| |
| String insertQuery = |
| "Insert into " + quote + TABLE_NAME + quote + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)"; |
| System.out.println("### Query is :" + insertQuery); |
| PreparedStatement statement = connection.prepareStatement(insertQuery); |
| statement.setObject(1, key); |
| statement.setObject(2, data.isAboolean()); |
| statement.setObject(3, data.getAbyte()); |
| statement.setObject(4, data.getAshort()); |
| statement.setObject(5, data.getAnint()); |
| statement.setObject(6, data.getAlong()); |
| statement.setObject(7, data.getAfloat()); |
| statement.setObject(8, data.getAdouble()); |
| statement.setObject(9, data.getAstring()); |
| statement.setObject(10, new java.sql.Timestamp(data.getAdate().getTime())); |
| statement.setObject(11, data.getAnobject()); |
| statement.setObject(12, data.getAbytearray()); |
| statement.setObject(13, new Character(data.getAchar()).toString()); |
| |
| statement.execute(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| closeDB(); |
| } |
| |
| private void closeDB() throws SQLException { |
| try (Connection connection = DriverManager.getConnection(connectionUrl)) { |
| DatabaseMetaData metaData = connection.getMetaData(); |
| String quote = metaData.getIdentifierQuoteString(); |
| try (Statement statement = connection.createStatement()) { |
| try { |
| statement.execute("Drop table " + TABLE_NAME); |
| } catch (SQLException ignore) { |
| } |
| |
| try { |
| statement.execute("Drop table " + quote + TABLE_NAME + quote); |
| } catch (SQLException ignore) { |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception { |
| createTable(); |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE" |
| + " --cache-writer=" + JdbcWriter.class.getName()); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| createJdbcDataSource(); |
| |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("name", "Emp1").writeInt("age", 55).create(); |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| assertThatThrownBy(() -> region.put("key1", pdxEmployee1)) |
| .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage( |
| "JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'."); |
| }); |
| } |
| |
| @Test |
| public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception { |
| createTable(); |
| IgnoredException.addIgnoredException("JdbcConnectorException"); |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createAsyncListener("JAW"); |
| createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE" |
| + " --async-event-queue-id=JAW"); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| createJdbcDataSource(); |
| |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("name", "Emp1").writeInt("age", 55).create(); |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put("key1", pdxEmployee1); |
| |
| JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache() |
| .getAsyncEventQueue("JAW").getAsyncEventListener(); |
| await().untilAsserted(() -> { |
| assertThat(asyncWriter.getFailedEvents()).isEqualTo(1); |
| }); |
| }); |
| } |
| |
| @Test |
| public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnInitialOperation() |
| throws Exception { |
| IgnoredException.addIgnoredException( |
| "Error detected when comparing mapping for region \"employees\" with table definition:"); |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| alterTable(); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "id1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| assertThatThrownBy(() -> region.put(key, pdxEmployee1)) |
| .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage( |
| "Jdbc mapping for \"" + REGION_NAME |
| + "\" does not match table definition, check logs for more details."); |
| }); |
| } |
| |
| @Test |
| public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnLoaderAlreadyInitialized() |
| throws Exception { |
| IgnoredException.addIgnoredException( |
| "Error detected when comparing mapping for region \"employees\" with table definition:"); |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "id1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, pdxEmployee1); // this initializes the writer |
| region.invalidate(key); |
| region.get(key); // this initializes the loader |
| region.invalidate(key); |
| }); |
| alterTable(); |
| dataserver.invoke(() -> { |
| String key = "id1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| assertThatThrownBy(() -> region.get(key)) |
| .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage( |
| "The jdbc-mapping does not contain the column name \"new_column\"." |
| + " This is probably caused by a column being added to the table after the jdbc-mapping was created."); |
| }); |
| } |
| |
| @Test |
| public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnServerStartup() |
| throws Exception { |
| IgnoredException.addIgnoredException( |
| "Error detected when comparing mapping for region \"employees\" with table definition:"); |
| IgnoredException.addIgnoredException( |
| "Jdbc mapping for \"employees\" does not match table definition, check logs for more details."); |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| alterTable(); |
| assertThatThrownBy( |
| () -> startupRule.startServerVM(3, x -> x.withConnectionToLocator(locator.getPort()))) |
| .hasCauseExactlyInstanceOf(JdbcConnectorException.class).hasStackTraceContaining( |
| "Jdbc mapping for \"employees\" does not match table definition, check logs for more details."); |
| } |
| |
| private void validateBothServersAndAccessors(MemberVM server1, MemberVM server2, |
| MemberVM accessor1, MemberVM accessor2) { |
| for (MemberVM server : Arrays.asList(server1, server2, accessor1, accessor2)) { |
| server.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String pdxkey1 = "pdxkey1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(pdxkey1, pdxEmployee1); |
| Employee employee1 = new Employee("key1", "name1", 30); |
| region.put("key1", employee1); |
| region.invalidate(pdxkey1); |
| region.invalidate("key1"); |
| await().untilAsserted(() -> { |
| assertThat(region.get(pdxkey1)).isNotNull(); |
| assertThat(region.get("key1")).isNotNull(); |
| }); |
| Employee pdxEmployee2 = (Employee) region.get(pdxkey1); |
| Employee employee2 = (Employee) region.get("key1"); |
| assertThat(pdxEmployee2.getName()).isEqualTo("Emp1"); |
| assertThat(employee2.getName()).isEqualTo("name1"); |
| }); |
| } |
| |
| for (MemberVM server : Arrays.asList(server1, server2)) { |
| server.invoke(() -> { |
| String queueName = MappingCommandUtils.createAsyncEventQueueName(REGION_NAME); |
| AsyncEventQueue queue = ClusterStartupRule.getCache().getAsyncEventQueue(queueName); |
| assertThat(queue).isNotNull(); |
| await().untilAsserted(() -> { |
| assertThat(queue.size()).isEqualTo(0); |
| }); |
| }); |
| } |
| } |
| |
| @Test |
| public void startAccessorForPRThenPutAndGet() throws Exception { |
| MemberVM server1 = createTableForGroup(4, "datagroup"); |
| MemberVM server2 = addServerForGroup(5, "datagroup"); |
| MemberVM accessor1 = addServerForGroup(6, "accessorgroup"); |
| MemberVM accessor2 = addServerForGroup(7, "accessorgroup"); |
| |
| createJdbcDataSource(); |
| createPartitionedRegionUsingGfshForGroup(false, "datagroup"); |
| createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), |
| "datagroup"); |
| createPartitionedRegionUsingGfshForGroup(true, "accessorgroup"); |
| createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), |
| "accessorgroup"); |
| |
| validateBothServersAndAccessors(server1, server2, accessor1, accessor2); |
| |
| for (int i = 4; i <= 7; i++) { |
| startupRule.stop(i); |
| } |
| } |
| |
| @Test |
| public void startAccessorForRRThenPutAndGet() throws Exception { |
| MemberVM server1 = createTableForGroup(4, "datagroup"); |
| MemberVM server2 = addServerForGroup(5, "datagroup"); |
| MemberVM accessor1 = addServerForGroup(6, "accessorgroup"); |
| MemberVM accessor2 = addServerForGroup(7, "accessorgroup"); |
| |
| createJdbcDataSource(); |
| createReplicatedRegionUsingGfshForGroup(false, "datagroup"); |
| createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), |
| "datagroup"); |
| createReplicatedRegionUsingGfshForGroup(true, "accessorgroup"); |
| createAsyncMappingForGroup(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), |
| "accessorgroup"); |
| |
| validateBothServersAndAccessors(server1, server2, accessor1, accessor2); |
| |
| for (int i = 4; i <= 7; i++) { |
| startupRule.stop(i); |
| } |
| } |
| |
| @Test |
| public void throwsExceptionWhenNoDataSourceExists() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| IgnoredException.addIgnoredException(JdbcConnectorException.class); |
| final String commandStr = "create jdbc-mapping --region=" + REGION_NAME |
| + " --data-source=" + DATA_SOURCE_NAME |
| + " --pdx-name=" + Employee.class.getName(); |
| gfsh.executeAndAssertThat(commandStr).statusIsError() |
| .containsOutput("JDBC data-source named \"" + DATA_SOURCE_NAME |
| + "\" not found. Create it with gfsh 'create data-source --pooled --name=" |
| + DATA_SOURCE_NAME + "'."); |
| } |
| |
| @Test |
| public void serverStartupSucceedsForPartitionedRegionAfterMappingIsCreated() |
| throws Exception { |
| createTable(); |
| createPartitionedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, false); |
| MemberVM server3 = |
| startupRule.startServerVM(3, x -> x.withConnectionToLocator(locator.getPort())); |
| server3.invoke(() -> { |
| RegionMapping mapping = |
| ClusterStartupRule.getCache().getService(JdbcConnectorService.class) |
| .getMappingForRegion(REGION_NAME); |
| assertThat(mapping.getDataSourceName()).isEqualTo(DATA_SOURCE_NAME); |
| assertThat(mapping.getPdxName()).isEqualTo(Employee.class.getName()); |
| assertThat(mapping.getTableName()).isEqualTo(TABLE_NAME); |
| List<FieldMapping> fieldMappings = mapping.getFieldMappings(); |
| assertThat(fieldMappings.size()).isEqualTo(3); |
| assertThat(fieldMappings.get(0)).isEqualTo( |
| new FieldMapping("name", FieldType.STRING.name(), "name", JDBCType.VARCHAR.name(), |
| true)); |
| assertThat(fieldMappings.get(1)).isEqualTo( |
| new FieldMapping("id", FieldType.STRING.name(), "id", JDBCType.VARCHAR.name(), false)); |
| assertThat(fieldMappings.get(2)).isEqualTo( |
| new FieldMapping("age", FieldType.INT.name(), "age", JDBCType.INTEGER.name(), false)); |
| |
| String queueName = MappingCommandUtils.createAsyncEventQueueName(REGION_NAME); |
| AsyncEventQueue queue = ClusterStartupRule.getCache().getAsyncEventQueue(queueName); |
| assertThat(queue).isNotNull(); |
| assertThat(queue.getAsyncEventListener()).isInstanceOf(JdbcAsyncWriter.class); |
| }); |
| } |
| |
| @Test |
| public void verifyDateToDate() throws Exception { |
| dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort())); |
| dataserver.invoke(() -> { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| Statement statement = connection.createStatement(); |
| statement.execute( |
| "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, " |
| + TestDate.DATE_FIELD_NAME + " date not null)"); |
| }); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true); |
| final String key = "emp1"; |
| final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11"); |
| final Date jdkDate = new Date(sqlDate.getTime()); |
| dataserver.invoke(() -> { |
| PdxInstance testDateInput = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName()) |
| .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create(); |
| |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, testDateInput); |
| region.invalidate(key); |
| }); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| client.invoke(() -> { |
| Region<String, TestDate> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| TestDate getResult = region.get(key); |
| assertThat(getResult.getMyDate()).isEqualTo(jdkDate); |
| }); |
| } |
| |
| @Test |
| public void verifyDateToTime() throws Exception { |
| dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort())); |
| dataserver.invoke(() -> { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| Statement statement = connection.createStatement(); |
| statement.execute( |
| "Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, " |
| + TestDate.DATE_FIELD_NAME + " time not null)"); |
| }); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true); |
| final String key = "emp1"; |
| final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59"); |
| final Date jdkDate = new Date(sqlTime.getTime()); |
| dataserver.invoke(() -> { |
| PdxInstance testDateInput = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName()) |
| .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create(); |
| |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, testDateInput); |
| region.invalidate(key); |
| }); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| client.invoke(() -> { |
| Region<String, TestDate> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| TestDate getResult = region.get(key); |
| assertThat(getResult.getMyDate()).isEqualTo(jdkDate); |
| }); |
| } |
| |
| @Test |
| public void verifyDateToTimestamp() throws Exception { |
| dataserver = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort())); |
| createTableWithTimeStamp(dataserver, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME); |
| |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true); |
| final String key = "emp1"; |
| final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123"); |
| final Date jdkDate = new Date(sqlTimestamp.getTime()); |
| dataserver.invoke(() -> { |
| PdxInstance testDateInput = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(TestDate.class.getName()) |
| .writeString("id", "key1").writeDate(TestDate.DATE_FIELD_NAME, jdkDate).create(); |
| |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, testDateInput); |
| region.invalidate(key); |
| }); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| client.invoke(() -> { |
| Region<String, TestDate> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| TestDate getResult = region.get(key); |
| assertThat(getResult.getMyDate()).isEqualTo(jdkDate); |
| }); |
| } |
| |
| protected void createTableWithTimeStamp(MemberVM vm, String connectionUrl, String tableName, |
| String columnName) { |
| vm.invoke(() -> { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| Statement statement = connection.createStatement(); |
| statement.execute("Create Table " + tableName |
| + " (id varchar(10) primary key not null, " + columnName + " timestamp not null)"); |
| }); |
| } |
| |
| @Test |
| public void putWritesToDB() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "emp1"; |
| ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1); |
| assertTableHasEmployeeData(1, pdxEmployee1, key); |
| }); |
| } |
| |
| @Test |
| public void putAsyncWritesToDB() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, false); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "emp1"; |
| ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1); |
| await().untilAsserted(() -> { |
| assertTableHasEmployeeData(1, pdxEmployee1, key); |
| }); |
| }); |
| } |
| |
| @Test |
| public void putAsyncWithPartitionWritesToDB() throws Exception { |
| createTable(); |
| createPartitionedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, false); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "emp1"; |
| ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1); |
| await().untilAsserted(() -> { |
| assertTableHasEmployeeData(1, pdxEmployee1, key); |
| }); |
| }); |
| } |
| |
| @Test |
| public void getReadsFromEmptyDB() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| dataserver.invoke(() -> { |
| String key = "emp1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| Object value = region.get(key); |
| assertThat(value).isNull(); |
| assertThat(region.size()).isEqualTo(0); |
| }); |
| } |
| |
| @Test |
| public void getReadsFromDB() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, true); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| |
| String key = "id1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, pdxEmployee1); |
| region.invalidate(key); |
| |
| JdbcWriter<Object, Object> writer = |
| (JdbcWriter<Object, Object>) region.getAttributes().getCacheWriter(); |
| long writeCallsCompletedBeforeGet = writer.getTotalEvents(); |
| |
| Employee result = (Employee) region.get(key); |
| assertThat(result.getId()).isEqualTo(key); |
| assertThat(result.getName()).isEqualTo("Emp1"); |
| assertThat(result.getAge()).isEqualTo(55); |
| assertThat(writer.getTotalEvents()).isEqualTo(writeCallsCompletedBeforeGet); |
| }); |
| } |
| |
| @Test |
| public void getReadsFromDBWithCompositeKey() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true, "id,age"); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| PdxInstance key = |
| ClusterStartupRule.getCache().createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) pdxEmployee1.getField("id"), String.class) |
| .writeField("age", (Integer) pdxEmployee1.getField("age"), int.class).create(); |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, pdxEmployee1); |
| region.invalidate(key); |
| JdbcWriter<Object, Object> writer = |
| (JdbcWriter<Object, Object>) region.getAttributes().getCacheWriter(); |
| long writeCallsCompletedBeforeGet = writer.getTotalEvents(); |
| |
| Employee result = (Employee) region.get(key); |
| |
| assertThat(result.getId()).isEqualTo("id1"); |
| assertThat(result.getName()).isEqualTo("Emp1"); |
| assertThat(result.getAge()).isEqualTo(55); |
| assertThat(writer.getTotalEvents()).isEqualTo(writeCallsCompletedBeforeGet); |
| }); |
| } |
| |
| @Test |
| public void getReadsFromDBWithAsyncWriter() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, false); |
| dataserver.invoke(() -> { |
| PdxInstance pdxEmployee1 = |
| ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create(); |
| String key = "id1"; |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache() |
| .getAsyncEventQueue(MappingCommandUtils.createAsyncEventQueueName(REGION_NAME)) |
| .getAsyncEventListener(); |
| |
| region.put(key, pdxEmployee1); |
| await().untilAsserted(() -> { |
| assertThat(asyncWriter.getSuccessfulEvents()).isEqualTo(1); |
| }); |
| region.invalidate(key); |
| Employee result = (Employee) region.get(key); |
| |
| assertThat(result.getId()).isEqualTo(pdxEmployee1.getField("id")); |
| assertThat(result.getName()).isEqualTo(pdxEmployee1.getField("name")); |
| assertThat(result.getAge()).isEqualTo(pdxEmployee1.getField("age")); |
| await().untilAsserted(() -> { |
| assertThat(asyncWriter.getIgnoredEvents()).isEqualTo(1); |
| }); |
| }); |
| } |
| |
| @Test |
| public void getReadsFromDBWithPdxClassName() throws Exception { |
| createTable(); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true); |
| dataserver.invoke(() -> { |
| String key = "id1"; |
| Employee value = new Employee(key, "Emp1", 55); |
| Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME); |
| region.put(key, value); |
| region.invalidate(key); |
| |
| Employee result = (Employee) region.get(key); |
| assertThat(result.getName()).isEqualTo("Emp1"); |
| assertThat(result.getAge()).isEqualTo(55); |
| }); |
| } |
| |
| @Test |
| public void clientGetReadsFromDBWithPdxClassName() throws Exception { |
| createTableForAllSupportedFields(); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true); |
| client.invoke(() -> { |
| String key = "id1"; |
| ClassWithSupportedPdxFields value = |
| new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2, |
| 3, 4, 5.5f, 6.0, "BigEmp", new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c'); |
| Region<String, ClassWithSupportedPdxFields> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| region.put(key, value); |
| region.invalidate(key); |
| |
| ClassWithSupportedPdxFields result = region.get(key); |
| assertThat(result).isEqualTo(value); |
| }); |
| } |
| |
| @Test |
| public void clientPutsAndGetsWithNullFieldsWithPdxClassName() throws Exception { |
| createTableForAllSupportedFields(); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true); |
| client.invoke(() -> { |
| String key = "id1"; |
| ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key); |
| Region<String, ClassWithSupportedPdxFields> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| region.put(key, value); |
| region.invalidate(key); |
| |
| ClassWithSupportedPdxFields result = region.get(key); |
| assertThat(result).isEqualTo(value); |
| }); |
| } |
| |
| @Test |
| public void clientRegistersPdxAndReadsFromDBWithPdxClassName() throws Exception { |
| createTableForAllSupportedFields(); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true); |
| String key = "id1"; |
| ClassWithSupportedPdxFields value = |
| new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2, |
| 3, 4, 5.5f, 6.0, "BigEmp", new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c'); |
| |
| dataserver.invoke(() -> { |
| insertDataForAllSupportedFieldsTable(key, value); |
| }); |
| |
| client.invoke(() -> { |
| ClusterStartupRule.getClientCache().registerPdxMetaData(new ClassWithSupportedPdxFields()); |
| |
| Region<String, ClassWithSupportedPdxFields> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| |
| ClassWithSupportedPdxFields result = region.get(key); |
| assertThat(result).isEqualTo(value); |
| }); |
| } |
| |
| @Test |
| public void clientRegistersPdxAndReadsFromDBContainingNullColumnsWithPdxClassName() |
| throws Exception { |
| createTableForAllSupportedFields(); |
| ClientVM client = getClientVM(); |
| createClientRegion(client); |
| createReplicatedRegionUsingGfsh(); |
| createJdbcDataSource(); |
| createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true); |
| String key = "id1"; |
| ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key); |
| |
| dataserver.invoke(() -> { |
| insertNullDataForAllSupportedFieldsTable(key); |
| }); |
| |
| client.invoke(() -> { |
| ClusterStartupRule.getClientCache().registerPdxMetaData(new ClassWithSupportedPdxFields()); |
| |
| Region<String, ClassWithSupportedPdxFields> region = |
| ClusterStartupRule.getClientCache().getRegion(REGION_NAME); |
| |
| ClassWithSupportedPdxFields result = region.get(key); |
| assertThat(result).isEqualTo(value); |
| }); |
| } |
| |
| private ClientVM getClientVM() throws Exception { |
| SerializableConsumerIF<ClientCacheFactory> cacheSetup = cf -> { |
| System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true"); |
| cf.addPoolLocator("localhost", locator.getPort()); |
| cf.setPdxSerializer( |
| new ReflectionBasedAutoSerializer(ClassWithSupportedPdxFields.class.getName())); |
| }; |
| return startupRule.startClientVM(2, c -> c.withCacheSetup(cacheSetup)); |
| } |
| |
| private void createClientRegion(ClientVM client) { |
| client.invoke(() -> { |
| ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(REGION_NAME); |
| }); |
| } |
| |
| private void createJdbcDataSource() { |
| final String commandStr = |
| "create data-source --pooled --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl; |
| gfsh.executeAndAssertThat(commandStr).statusIsSuccess(); |
| } |
| |
| private void createAsyncListener(String id) { |
| final String commandStr = |
| "create async-event-queue --id=" + id + " --listener=" + JdbcAsyncWriter.class.getName() |
| + " --batch-size=1 --batch-time-interval=0 --parallel=false"; |
| gfsh.executeAndAssertThat(commandStr).statusIsSuccess(); |
| } |
| |
| private void createReplicatedRegionUsingGfsh() { |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| } |
| |
| private void createReplicatedRegionUsingGfshForGroup(boolean isAccessor, String groupName) { |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createRegionCmd.append("create region --name=" + REGION_NAME + " --groups=" + groupName |
| + " --if-not-exists=true" |
| + (isAccessor |
| ? " --type=" + RegionShortcut.REPLICATE_PROXY.name() |
| : " --type=" + RegionShortcut.REPLICATE.name())); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| } |
| |
| private void createPartitionedRegionUsingGfshForGroup(boolean isAccessor, String groupName) { |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createRegionCmd |
| .append("create region --name=" + REGION_NAME + " --groups=" + groupName |
| + " --if-not-exists=true" |
| + (isAccessor |
| ? " --type=" + RegionShortcut.PARTITION_PROXY.name() |
| : " --type=" + RegionShortcut.PARTITION.name()) |
| + " --redundant-copies=1"); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| } |
| |
| private void createAsyncMappingForGroup(String regionName, String connectionName, |
| String pdxClassName, |
| String groupName) { |
| final String commandStr = "create jdbc-mapping --region=" + regionName |
| + " --data-source=" + connectionName |
| + " --table=" + TABLE_NAME |
| + " --synchronous=false" |
| + " --if-not-exists=true" |
| + " --pdx-name=" + pdxClassName |
| + " --groups=" + groupName; |
| gfsh.executeAndAssertThat(commandStr).statusIsSuccess(); |
| } |
| |
| private void createPartitionedRegionUsingGfsh() { |
| StringBuffer createRegionCmd = new StringBuffer(); |
| createRegionCmd |
| .append("create region --name=" + REGION_NAME + " --type=PARTITION --redundant-copies=1"); |
| gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess(); |
| } |
| |
| private void createMapping(String regionName, String connectionName, boolean synchronous) { |
| createMapping(regionName, connectionName, Employee.class.getName(), synchronous, null); |
| } |
| |
| private void createMapping(String regionName, String connectionName, String pdxClassName, |
| boolean synchronous) { |
| createMapping(regionName, connectionName, pdxClassName, synchronous, null); |
| } |
| |
| private void createMapping(String regionName, String connectionName, String pdxClassName, |
| boolean synchronous, String ids) { |
| final String commandStr = "create jdbc-mapping --region=" + regionName |
| + " --data-source=" + connectionName |
| + " --table=" + TABLE_NAME |
| + " --synchronous=" + synchronous |
| + " --pdx-name=" + pdxClassName |
| + ((ids != null) ? (" --id=" + ids) : ""); |
| gfsh.executeAndAssertThat(commandStr).statusIsSuccess(); |
| if (!synchronous) { |
| final String alterAsyncQueue = |
| "alter async-event-queue --id=" |
| + MappingCommandUtils.createAsyncEventQueueName(regionName) |
| + " --batch-size=1 --batch-time-interval=0"; |
| gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess(); |
| } |
| } |
| |
| private void assertTableHasEmployeeData(int size, PdxInstance employee, String key) |
| throws SQLException { |
| Connection connection = DriverManager.getConnection(connectionUrl); |
| Statement statement = connection.createStatement(); |
| await().untilAsserted(() -> { |
| assertThat(getRowCount(statement, TABLE_NAME)).isEqualTo(size); |
| }); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_NAME + " order by id asc"); |
| assertThat(resultSet.next()).isTrue(); |
| assertThat(resultSet.getString("id")).isEqualTo(key); |
| assertThat(resultSet.getString("name")).isEqualTo(employee.getField("name")); |
| assertThat(resultSet.getObject("age")).isEqualTo(employee.getField("age")); |
| } |
| |
| private int getRowCount(Statement stmt, String tableName) { |
| try { |
| ResultSet resultSet = stmt.executeQuery("select count(*) from " + tableName); |
| resultSet.next(); |
| return resultSet.getInt(1); |
| } catch (SQLException e) { |
| return -1; |
| } |
| } |
| |
| } |