blob: ff8bcfeb96586e0e3fb6d8d558eee66f455ff8f4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.connectors.jdbc.internal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.connectors.jdbc.JdbcConnectorException;
import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.jndi.JNDIInvoker;
import org.apache.geode.pdx.PdxInstance;
public class SqlHandler {
private final InternalCache cache;
private final RegionMapping regionMapping;
private final DataSource dataSource;
private final TableMetaDataView tableMetaData;
private final Map<String, FieldMapping> pdxToFieldMappings = new HashMap<>();
private volatile SqlToPdxInstance sqlToPdxInstance;
public SqlHandler(InternalCache cache, String regionName,
TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService,
DataSourceFactory dataSourceFactory) {
this.cache = cache;
this.regionMapping = getMappingForRegion(configService, regionName);
this.dataSource = getDataSource(dataSourceFactory, this.regionMapping.getDataSourceName());
this.tableMetaData = getTableMetaDataView(tableMetaDataManager);
cache.getService(JdbcConnectorService.class).validateMapping(regionMapping, dataSource);
public SqlHandler(InternalCache cache, String regionName,
TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService) {
this(cache, regionName, tableMetaDataManager, configService,
dataSourceName -> JNDIInvoker.getDataSource(dataSourceName));
private TableMetaDataView getTableMetaDataView(TableMetaDataManager tableMetaDataManager) {
try (Connection connection = getConnection()) {
return tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
} catch (SQLException ex) {
throw new JdbcConnectorException("Could not connect to datasource \""
+ regionMapping.getDataSourceName() + "\" because: " + ex);
private static RegionMapping getMappingForRegion(JdbcConnectorService configService,
String regionName) {
RegionMapping regionMapping = configService.getMappingForRegion(regionName);
if (regionMapping == null) {
throw new JdbcConnectorException("JDBC mapping for region " + regionName
+ " not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
return regionMapping;
private static DataSource getDataSource(DataSourceFactory dataSourceFactory,
String dataSourceName) {
DataSource dataSource = dataSourceFactory.getDataSource(dataSourceName);
if (dataSource == null) {
throw new JdbcConnectorException("JDBC data-source named \"" + dataSourceName
+ "\" not found. Create it with gfsh 'create data-source --pooled --name="
+ dataSourceName + "'.");
return dataSource;
private void initializeFieldMappingMaps() {
for (FieldMapping fieldMapping : regionMapping.getFieldMappings()) {
this.pdxToFieldMappings.put(fieldMapping.getPdxName(), fieldMapping);
private String getColumnNameForField(String fieldName) {
FieldMapping match = this.pdxToFieldMappings.get(fieldName);
if (match != null) {
return match.getJdbcName();
return null;
Connection getConnection() throws SQLException {
return this.dataSource.getConnection();
public <K, V> PdxInstance read(Region<K, V> region, K key) throws SQLException {
if (key == null) {
throw new IllegalArgumentException("Key for query cannot be null");
PdxInstance result;
try (Connection connection = getConnection()) {
EntryColumnData entryColumnData =
getEntryColumnData(tableMetaData, key, null, Operation.GET);
try (PreparedStatement statement =
getPreparedStatement(connection, tableMetaData, entryColumnData, Operation.GET)) {
try (ResultSet resultSet = executeReadQuery(statement, entryColumnData)) {
result = getSqlToPdxInstance().create(resultSet);
return result;
private SqlToPdxInstance getSqlToPdxInstance() {
SqlToPdxInstance result = this.sqlToPdxInstance;
if (result == null) {
result = initializeSqlToPdxInstance();
return result;
private synchronized SqlToPdxInstance initializeSqlToPdxInstance() {
SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
new SqlToPdxInstanceCreator(cache, regionMapping);
SqlToPdxInstance result = sqlToPdxInstanceCreator.create();
this.sqlToPdxInstance = result;
return result;
private ResultSet executeReadQuery(PreparedStatement statement, EntryColumnData entryColumnData)
throws SQLException {
setValuesInStatement(statement, entryColumnData, Operation.GET);
return statement.executeQuery();
private void setValuesInStatement(PreparedStatement statement, EntryColumnData entryColumnData,
Operation operation)
throws SQLException {
int index = 0;
if (operation.isCreate() || operation.isUpdate()) {
index = setValuesFromColumnData(statement, entryColumnData.getEntryValueColumnData(), index);
setValuesFromColumnData(statement, entryColumnData.getEntryKeyColumnData(), index);
private int setValuesFromColumnData(PreparedStatement statement, List<ColumnData> columnDataList,
int index) throws SQLException {
for (ColumnData columnData : columnDataList) {
setValueOnStatement(statement, index, columnData);
return index;
private void setValueOnStatement(PreparedStatement statement, int index, ColumnData columnData)
throws SQLException {
Object value = columnData.getValue();
if (value instanceof Character) {
Character character = ((Character) value);
// if null character, set to null string instead of a string with the null character
value = character.equals(Character.valueOf((char) 0)) ? null : character.toString();
} else if (value instanceof Date) {
Date jdkDate = (Date) value;
switch (columnData.getDataType()) {
case DATE:
value = new java.sql.Date(jdkDate.getTime());
case TIME:
value = new java.sql.Time(jdkDate.getTime());
value = new java.sql.Timestamp(jdkDate.getTime());
// no conversion needed
if (value == null) {
statement.setNull(index, columnData.getDataType().getVendorTypeNumber());
} else {
statement.setObject(index, value);
public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxInstance value)
throws SQLException {
if (value == null && !operation.isDestroy()) {
throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
try (Connection connection = getConnection()) {
EntryColumnData entryColumnData =
getEntryColumnData(tableMetaData, key, value, operation);
int updateCount = 0;
try (PreparedStatement statement =
getPreparedStatement(connection, tableMetaData, entryColumnData, operation)) {
updateCount = executeWriteStatement(statement, entryColumnData, operation);
} catch (SQLException e) {
if (operation.isDestroy()) {
throw e;
// Destroy action not guaranteed to modify any database rows
if (operation.isDestroy()) {
if (updateCount <= 0) {
Operation upsertOp = getOppositeOperation(operation);
try (PreparedStatement upsertStatement =
getPreparedStatement(connection, tableMetaData, entryColumnData, upsertOp)) {
updateCount = executeWriteStatement(upsertStatement, entryColumnData, operation);
assert updateCount == 1 : "expected 1 but updateCount was: " + updateCount;
private Operation getOppositeOperation(Operation operation) {
return operation.isUpdate() ? Operation.CREATE : Operation.UPDATE;
private int executeWriteStatement(PreparedStatement statement, EntryColumnData entryColumnData,
Operation operation)
throws SQLException {
setValuesInStatement(statement, entryColumnData, operation);
return statement.executeUpdate();
private PreparedStatement getPreparedStatement(Connection connection,
TableMetaDataView tableMetaData, EntryColumnData entryColumnData, Operation operation)
throws SQLException {
String sqlStr = getSqlString(tableMetaData, entryColumnData, operation);
return connection.prepareStatement(sqlStr);
private String getSqlString(TableMetaDataView tableMetaData, EntryColumnData entryColumnData,
Operation operation) {
SqlStatementFactory statementFactory =
new SqlStatementFactory(tableMetaData.getIdentifierQuoteString());
String tableName = tableMetaData.getQuotedTablePath();
if (operation.isCreate()) {
return statementFactory.createInsertSqlString(tableName, entryColumnData);
} else if (operation.isUpdate()) {
return statementFactory.createUpdateSqlString(tableName, entryColumnData);
} else if (operation.isDestroy()) {
return statementFactory.createDestroySqlString(tableName, entryColumnData);
} else if (operation.isGet()) {
return statementFactory.createSelectQueryString(tableName, entryColumnData);
} else {
throw new InternalGemFireException("unsupported operation " + operation);
<K> EntryColumnData getEntryColumnData(TableMetaDataView tableMetaData,
K key, PdxInstance value, Operation operation) {
List<ColumnData> keyColumnData = createKeyColumnDataList(tableMetaData, key);
List<ColumnData> valueColumnData = null;
if (operation.isCreate() || operation.isUpdate()) {
valueColumnData = createValueColumnDataList(tableMetaData, value);
return new EntryColumnData(keyColumnData, valueColumnData);
private <K> List<ColumnData> createKeyColumnDataList(TableMetaDataView tableMetaData, K key) {
List<String> keyColumnNames = tableMetaData.getKeyColumnNames();
List<ColumnData> result = new ArrayList<>();
if (keyColumnNames.size() == 1) {
String keyColumnName = keyColumnNames.get(0);
ColumnData columnData =
new ColumnData(keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
} else {
if (!(key instanceof PdxInstance)) {
throw new JdbcConnectorException(
"The key \"" + key + "\" of class \"" + key.getClass().getName()
+ "\" must be a PdxInstance because multiple columns are configured as ids.");
PdxInstance compositeKey = (PdxInstance) key;
if (compositeKey.isDeserializable()) {
throw new JdbcConnectorException(
"The key \"" + key
+ "\" must be a PdxInstance created with PdxInstanceFactory.neverDeserialize");
List<String> fieldNames = compositeKey.getFieldNames();
if (fieldNames.size() != keyColumnNames.size()) {
throw new JdbcConnectorException("The key \"" + key + "\" should have "
+ keyColumnNames.size() + " fields but has " + fieldNames.size() + " fields.");
for (String fieldName : fieldNames) {
String columnName = getColumnNameForField(fieldName);
if (columnName == null || !keyColumnNames.contains(columnName)) {
throw new JdbcConnectorException("The key \"" + key + "\" has the field \"" + fieldName
+ "\" which does not match any of the key columns: " + keyColumnNames);
ColumnData columnData = new ColumnData(columnName, compositeKey.getField(fieldName),
return result;
private List<ColumnData> createValueColumnDataList(TableMetaDataView tableMetaData,
PdxInstance value) {
List<ColumnData> result = new ArrayList<>();
for (String fieldName : value.getFieldNames()) {
String columnName = getColumnNameForField(fieldName);
if (columnName == null) {
// The user must have added a new field to their pdx domain class.
// To support PDX class versioning we will ignore this field.
if (tableMetaData.getKeyColumnNames().contains(columnName)) {
ColumnData columnData = new ColumnData(columnName, value.getField(fieldName),
return result;
public interface DataSourceFactory {
public DataSource getDataSource(String dataSourceName);