| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.connectors.jdbc; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.catchThrowable; |
| |
| import java.sql.Connection; |
| import java.sql.JDBCType; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.sql.DataSource; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException; |
| import org.apache.geode.connectors.jdbc.internal.SqlHandler; |
| import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager; |
| import org.apache.geode.connectors.jdbc.internal.TestConfigService; |
| import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.pdx.FieldType; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.pdx.WritablePdxInstance; |
| |
| public abstract class JdbcWriterIntegrationTest { |
| |
| static final String DB_NAME = "test"; |
| protected static final String SCHEMA_NAME = "mySchema"; |
| protected static final String REGION_TABLE_NAME = "employees"; |
| |
| protected InternalCache cache; |
| protected Region<Object, Object> employees; |
| protected Connection connection; |
| protected Statement statement; |
| protected JdbcWriter<Object, Object> jdbcWriter; |
| protected PdxInstance pdx1; |
| protected PdxInstance pdx2; |
| protected PdxInstance illegalPdx; |
| protected Employee employee1; |
| protected Employee employee2; |
| protected Employee illegalEmployee; |
| protected final TestDataSourceFactory testDataSourceFactory = |
| new TestDataSourceFactory(getConnectionUrl()); |
| protected String catalog; |
| protected String schema; |
| |
| @Before |
| public void setUp() throws Exception { |
| cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0") |
| .setPdxReadSerialized(false).create(); |
| |
| connection = getConnection(); |
| statement = connection.createStatement(); |
| pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "1") |
| .writeString("name", "Emp1") |
| .writeInt("age", 55).create(); |
| pdx2 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "2") |
| .writeString("name", "Emp2") |
| .writeInt("age", 21).create(); |
| illegalPdx = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "99") |
| .writeString("name", "IllegalEmployee") |
| .writeInt("age", 33).create(); |
| employee1 = (Employee) pdx1.getObject(); |
| employee2 = (Employee) pdx2.getObject(); |
| illegalEmployee = (Employee) illegalPdx.getObject(); |
| createTableInUnusedSchema(); |
| } |
| |
| protected void createTable() throws SQLException { |
| createTable(true); |
| } |
| |
| private void createTable(boolean hasPrimaryKey) throws SQLException { |
| if (hasPrimaryKey) { |
| statement.execute("Create Table " + REGION_TABLE_NAME |
| + " (id varchar(10) primary key not null, name varchar(10), age int)"); |
| } else { |
| statement.execute("Create Table " + REGION_TABLE_NAME |
| + " (id varchar(10) not null, name varchar(10), age int)"); |
| } |
| } |
| |
| protected void createTableWithSchema() throws SQLException { |
| statement.execute("Create Schema " + SCHEMA_NAME); |
| statement.execute("Create Table " + SCHEMA_NAME + '.' + REGION_TABLE_NAME |
| + " (id varchar(10) primary key not null, name varchar(10), age int)"); |
| } |
| |
| protected void createTableInUnusedSchema() throws SQLException { |
| Connection connection2 = getConnection(); |
| statement.execute("Create Schema unusedSchema"); |
| statement = connection2.createStatement(); |
| statement.execute("Create Table " + "unusedSchema." + REGION_TABLE_NAME |
| + " (id varchar(10) primary key not null, name varchar(10), age int)"); |
| } |
| |
| protected void setupRegion(String ids) throws RegionMappingExistsException { |
| sharedRegionSetup(ids, null, null); |
| } |
| |
| protected void sharedRegionSetup(String ids, String catalog, String schema) |
| throws RegionMappingExistsException { |
| List<FieldMapping> fieldMappings = Arrays.asList( |
| new FieldMapping("id", FieldType.STRING.name(), "id", JDBCType.VARCHAR.name(), false), |
| new FieldMapping("name", FieldType.STRING.name(), "name", JDBCType.VARCHAR.name(), true), |
| new FieldMapping("age", FieldType.OBJECT.name(), "age", JDBCType.INTEGER.name(), true)); |
| employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME, ids, catalog, schema, |
| fieldMappings); |
| } |
| |
| protected void setupRegionWithSchema(String ids) throws RegionMappingExistsException { |
| if (vendorSupportsSchemas()) { |
| catalog = null; |
| schema = SCHEMA_NAME; |
| } else { |
| catalog = SCHEMA_NAME; |
| schema = null; |
| |
| } |
| sharedRegionSetup(ids, catalog, schema); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| cache.close(); |
| closeDB(); |
| } |
| |
| public abstract Connection getConnection() throws SQLException; |
| |
| public abstract String getConnectionUrl(); |
| |
| private void closeDB() throws Exception { |
| if (statement == null) { |
| if (connection != null) { |
| statement = connection.createStatement(); |
| } |
| } |
| if (statement != null) { |
| statement.execute("Drop table IF EXISTS " + REGION_TABLE_NAME); |
| statement.execute("Drop table IF EXISTS unusedSchema." + REGION_TABLE_NAME); |
| statement.execute("Drop schema IF EXISTS unusedSchema"); |
| statement.execute("Drop table IF EXISTS " + SCHEMA_NAME + '.' + REGION_TABLE_NAME); |
| statement.execute("Drop schema IF EXISTS " + SCHEMA_NAME); |
| statement.close(); |
| } |
| if (connection != null) { |
| connection.close(); |
| } |
| testDataSourceFactory.close(); |
| } |
| |
| @Test |
| public void canInsertIntoTable() throws Exception { |
| createTable(); |
| DataSource dataSource = testDataSourceFactory.getDataSource("testConnectionConfig"); |
| setupRegion("id"); |
| |
| employees.put("1", pdx1); |
| employees.put("2", pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| |
| dataSource.getConnection(); |
| } |
| |
| protected abstract boolean vendorSupportsSchemas(); |
| |
| @Test |
| public void canInsertIntoTableWithSchema() throws Exception { |
| createTableWithSchema(); |
| setupRegionWithSchema("id"); |
| employees.put("1", pdx1); |
| employees.put("2", pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery( |
| "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canInsertIntoTableWithCompositeKey() throws Exception { |
| createTable(); |
| setupRegion("id,age"); |
| PdxInstance compositeKey1 = cache.createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) pdx1.getField("id"), String.class) |
| .writeField("age", (Integer) pdx1.getField("age"), int.class).create(); |
| PdxInstance compositeKey2 = cache.createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) pdx2.getField("id"), String.class) |
| .writeField("age", (Integer) pdx2.getField("age"), int.class).create(); |
| |
| employees.put(compositeKey1, pdx1); |
| employees.put(compositeKey2, pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canPutAllInsertIntoTable() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| Map<String, PdxInstance> putAllMap = new HashMap<>(); |
| putAllMap.put("1", pdx1); |
| putAllMap.put("2", pdx2); |
| employees.putAll(putAllMap); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("name", "Emp1").writeInt("age", 55).writeString("id", "3").create(); |
| employees.put("1", pdxInstanceWithId); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", (Employee) pdxInstanceWithId.getObject()); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void putNonPdxInstanceFails() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| Throwable thrown = catchThrowable(() -> employees.put("1", "non pdx instance")); |
| assertThat(thrown).isInstanceOf(IllegalArgumentException.class); |
| } |
| |
| @Test |
| public void putNonPdxInstanceThatIsPdxSerializable() |
| throws SQLException, RegionMappingExistsException { |
| createTable(); |
| setupRegion("id"); |
| Employee value = new Employee("2", "Emp2", 22); |
| employees.put("2", value); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "2", value); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void putInstanceFieldLengthGreaterThanTableColumnLengthFails() |
| throws SQLException, RegionMappingExistsException { |
| createTable(); |
| setupRegion("id"); |
| |
| Throwable thrown = catchThrowable(() -> employees.put("99", illegalEmployee)); |
| |
| assertThat(thrown).isInstanceOf(JdbcConnectorException.class); |
| assertThat(thrown.getMessage()).startsWith(getDataTooLongSqlErrorMessage()); |
| } |
| |
| @Test |
| public void canDestroyFromTable() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| employees.put("1", pdx1); |
| employees.put("2", pdx2); |
| |
| employees.destroy("1"); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canDestroyFromTableWithSchema() throws Exception { |
| createTableWithSchema(); |
| setupRegionWithSchema("id"); |
| employees.put("1", pdx1); |
| employees.put("2", pdx2); |
| |
| employees.destroy("1"); |
| |
| ResultSet resultSet = |
| statement.executeQuery( |
| "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canDestroyFromTableWithCompositeKey() throws Exception { |
| createTable(); |
| setupRegion("id,age"); |
| PdxInstance compositeKey1 = cache.createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) pdx1.getField("id"), String.class) |
| .writeField("age", (Integer) pdx1.getField("age"), int.class).create(); |
| PdxInstance compositeKey2 = cache.createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) pdx2.getField("id"), String.class) |
| .writeField("age", (Integer) pdx2.getField("age"), int.class).create(); |
| employees.put(compositeKey1, pdx1); |
| employees.put(compositeKey2, pdx2); |
| |
| employees.destroy(compositeKey1); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "2", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canUpdateTable() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| employees.put("1", pdx1); |
| employees.put("1", pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canUpdateTableWithSchema() throws Exception { |
| createTableWithSchema(); |
| setupRegionWithSchema("id"); |
| employees.put("1", pdx1); |
| employees.put("1", pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery( |
| "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canUpdateTableWithCompositeKey() throws Exception { |
| createTable(); |
| setupRegion("id,age"); |
| PdxInstance myPdx = cache.createPdxInstanceFactory(Employee.class.getName()) |
| .writeString("id", "1").writeString("name", "Emp1") |
| .writeInt("age", 55).create(); |
| PdxInstance compositeKey1 = cache.createPdxInstanceFactory("IdAgeKeyType").neverDeserialize() |
| .writeField("id", (String) myPdx.getField("id"), String.class) |
| .writeField("age", (Integer) myPdx.getField("age"), int.class).create(); |
| employees.put(compositeKey1, myPdx); |
| WritablePdxInstance updatedPdx = myPdx.createWriter(); |
| updatedPdx.setField("name", "updated"); |
| Employee updatedEmployee = (Employee) updatedPdx.getObject(); |
| |
| employees.put(compositeKey1, updatedPdx); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", updatedEmployee); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void updateInstanceFieldLengthGreaterThanTableColumnLengthFails() |
| throws SQLException, RegionMappingExistsException { |
| createTable(); |
| setupRegion("id"); |
| employees.put("1", pdx1); |
| |
| Throwable thrown = catchThrowable(() -> employees.put("1", illegalPdx)); |
| |
| assertThat(thrown).isInstanceOf(JdbcConnectorException.class); |
| assertThat(thrown.getMessage()).startsWith(getDataTooLongSqlErrorMessage()); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canUpdateBecomeInsert() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| employees.put("1", pdx1); |
| |
| statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'"); |
| validateTableRowCount(0); |
| |
| employees.put("1", pdx2); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee2); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void canInsertBecomeUpdate() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)"); |
| validateTableRowCount(1); |
| |
| employees.put("1", pdx1); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", employee1); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void updateBecomeInsertFieldLengthGreaterThanTableColumnLengthFails() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| employees.put("1", pdx1); |
| |
| statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'"); |
| validateTableRowCount(0); |
| |
| Throwable thrown = catchThrowable(() -> employees.put("1", illegalPdx)); |
| |
| assertThat(thrown).isInstanceOf(JdbcConnectorException.class); |
| assertThat(thrown.getMessage()).startsWith(getDataTooLongSqlErrorMessage()); |
| } |
| |
| @Test |
| public void insertBecomeUpdateFieldLengthGreaterThanTableColumnLengthFails() throws Exception { |
| createTable(); |
| setupRegion("id"); |
| statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)"); |
| validateTableRowCount(1); |
| |
| Throwable thrown = catchThrowable(() -> employees.put("1", illegalPdx)); |
| |
| assertThat(thrown).isInstanceOf(JdbcConnectorException.class); |
| assertThat(thrown.getMessage()).startsWith(getDataTooLongSqlErrorMessage()); |
| |
| ResultSet resultSet = |
| statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc"); |
| assertRecordMatchesEmployee(resultSet, "1", new Employee("11", "bogus", 11)); |
| assertThat(resultSet.next()).isFalse(); |
| } |
| |
| @Test |
| public void nonPrimaryKeyTableMultipleDataIsUpdated() throws Exception { |
| createTable(false); |
| setupRegion("id"); |
| employees.put("2", pdx2); |
| statement.execute("Insert into " + REGION_TABLE_NAME + " values('2', 'bogus', 21)"); |
| validateTableRowCount(2); |
| |
| Throwable thrown = catchThrowable(() -> employees.put("2", pdx2)); |
| |
| assertThat(thrown).isInstanceOf(AssertionError.class); |
| assertThat(thrown.getMessage()).isEqualTo("expected 1 but updateCount was: 2"); |
| } |
| |
| protected Region<Object, Object> createRegionWithJDBCSynchronousWriter(String regionName, |
| String ids, String catalog, String schema, List<FieldMapping> fieldMappings) |
| throws RegionMappingExistsException { |
| jdbcWriter = |
| new JdbcWriter<>(createSqlHandler(regionName, ids, catalog, schema, fieldMappings), cache); |
| |
| RegionFactory<Object, Object> regionFactory = |
| cache.createRegionFactory(RegionShortcut.REPLICATE); |
| regionFactory.setCacheWriter(jdbcWriter); |
| return regionFactory.create(regionName); |
| } |
| |
| protected void validateTableRowCount(int expected) throws Exception { |
| ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME); |
| resultSet.next(); |
| int size = resultSet.getInt(1); |
| assertThat(size).isEqualTo(expected); |
| } |
| |
| protected SqlHandler createSqlHandler(String regionName, String ids, String catalog, |
| String schema, |
| List<FieldMapping> fieldMappings) |
| throws RegionMappingExistsException { |
| return new SqlHandler(cache, regionName, new TableMetaDataManager(), |
| TestConfigService.getTestConfigService(cache, null, ids, catalog, schema, fieldMappings), |
| testDataSourceFactory); |
| } |
| |
| protected void assertRecordMatchesEmployee(ResultSet resultSet, String id, Employee employee) |
| throws SQLException { |
| assertThat(resultSet.next()).isTrue(); |
| assertThat(resultSet.getString("id")).isEqualTo(id); |
| assertThat(resultSet.getString("name")).isEqualTo(employee.getName()); |
| assertThat(resultSet.getInt("age")).isEqualTo(employee.getAge()); |
| } |
| |
| protected abstract String getDataTooLongSqlErrorMessage(); |
| } |