blob: eb244fdca596777febc4f245103b57402dcfb050 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.store.cassandra.persistence;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
/**
* Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes.
* Handles all the mappings to/from Java classes into Cassandra and responsible for all the details
* of how Java objects should be written/loaded to/from Cassandra.
*/
public class PersistenceController {
/** Ignite cache key/value persistence settings. */
private final KeyValuePersistenceSettings persistenceSettings;
/** List of key unique POJO fields (skipping aliases pointing to the same Cassandra table column). */
private final List<? extends PojoField> keyUniquePojoFields;
/** List of value unique POJO fields (skipping aliases pointing to the same Cassandra table column). */
private final List<? extends PojoField> valUniquePojoFields;
/** CQL statement template to insert row into Cassandra table. */
private final String writeStatementTempl;
/** CQL statement template to delete row from Cassandra table. */
private final String delStatementTempl;
/** CQL statement template to select value fields from Cassandra table. */
private final String loadStatementTempl;
/** CQL statement template to select key/value fields from Cassandra table. */
private final String loadWithKeyFieldsStatementTempl;
/** CQL statements to insert row into Cassandra table. */
private volatile Map<String, String> writeStatements = new HashMap<>();
/** CQL statements to delete row from Cassandra table. */
private volatile Map<String, String> delStatements = new HashMap<>();
/** CQL statements to select value fields from Cassandra table. */
private volatile Map<String, String> loadStatements = new HashMap<>();
/** CQL statements to select key/value fields from Cassandra table. */
private volatile Map<String, String> loadWithKeyFieldsStatements = new HashMap<>();
/**
* Constructs persistence controller from Ignite cache persistence settings.
*
* @param settings persistence settings.
*/
public PersistenceController(KeyValuePersistenceSettings settings) {
if (settings == null)
throw new IllegalArgumentException("Persistent settings can't be null");
persistenceSettings = settings;
String[] loadStatements = prepareLoadStatements();
loadWithKeyFieldsStatementTempl = loadStatements[0];
loadStatementTempl = loadStatements[1];
writeStatementTempl = prepareWriteStatement();
delStatementTempl = prepareDeleteStatement();
keyUniquePojoFields = settings.getKeyPersistenceSettings().cassandraUniqueFields();
List<? extends PojoField> _valUniquePojoFields = settings.getValuePersistenceSettings().cassandraUniqueFields();
if (_valUniquePojoFields == null || _valUniquePojoFields.isEmpty()) {
valUniquePojoFields = _valUniquePojoFields;
return;
}
List<String> keyColumns = new LinkedList<>();
if (keyUniquePojoFields == null)
keyColumns.add(settings.getKeyPersistenceSettings().getColumn());
else {
for (PojoField field : keyUniquePojoFields)
keyColumns.add(field.getColumn());
}
List<PojoField> fields = new LinkedList<>(_valUniquePojoFields);
for (String column : keyColumns) {
for (int i = 0; i < fields.size(); i++) {
if (column.equals(fields.get(i).getColumn())) {
fields.remove(i);
break;
}
}
}
valUniquePojoFields = fields.isEmpty() ? null : Collections.unmodifiableList(fields);
}
/**
* Returns Ignite cache persistence settings.
*
* @return persistence settings.
*/
public KeyValuePersistenceSettings getPersistenceSettings() {
return persistenceSettings;
}
/**
* Returns CQL statement to insert row into Cassandra table.
*
* @param table Table name.
* @return CQL statement.
*/
public String getWriteStatement(String table) {
return getStatement(table, writeStatementTempl, writeStatements);
}
/**
* Returns CQL statement to delete row from Cassandra table.
*
* @param table Table name.
* @return CQL statement.
*/
public String getDeleteStatement(String table) {
return getStatement(table, delStatementTempl, delStatements);
}
/**
* Returns CQL statement to select key/value fields from Cassandra table.
*
* @param table Table name.
* @param includeKeyFields whether to include/exclude key fields from the returned row.
*
* @return CQL statement.
*/
public String getLoadStatement(String table, boolean includeKeyFields) {
return includeKeyFields ?
getStatement(table, loadWithKeyFieldsStatementTempl, loadWithKeyFieldsStatements) :
getStatement(table, loadStatementTempl, loadStatements);
}
/**
* Binds Ignite cache key object to {@link PreparedStatement}.
*
* @param statement statement to which key object should be bind.
* @param key key object.
*
* @return statement with bounded key.
*/
public BoundStatement bindKey(PreparedStatement statement, Object key) {
PersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings();
Object[] values = PersistenceStrategy.POJO != settings.getStrategy() ?
new Object[1] : new Object[keyUniquePojoFields.size()];
bindValues(settings.getStrategy(), settings.getSerializer(), keyUniquePojoFields, key, values, 0);
return statement.bind(values);
}
/**
* Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}.
*
* @param statement statement to which key and value object should be bind.
* @param key key object.
* @param val value object.
*
* @return statement with bounded key and value.
*/
public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) {
Object[] values = new Object[persistenceSettings.getTableColumns().size()];
PersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings();
PersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings();
int offset = bindValues(keySettings.getStrategy(), keySettings.getSerializer(), keyUniquePojoFields, key, values, 0);
bindValues(valSettings.getStrategy(), valSettings.getSerializer(), valUniquePojoFields, val, values, offset);
return statement.bind(values);
}
/**
* Builds Ignite cache key object from returned Cassandra table row.
*
* @param row Cassandra table row.
*
* @return key object.
*/
public Object buildKeyObject(Row row) {
return buildObject(row, persistenceSettings.getKeyPersistenceSettings());
}
/**
* Builds Ignite cache value object from Cassandra table row .
*
* @param row Cassandra table row.
*
* @return value object.
*/
public Object buildValueObject(Row row) {
return buildObject(row, persistenceSettings.getValuePersistenceSettings());
}
/**
* Service method to prepare CQL write statement.
*
* @return CQL write statement.
*/
private String prepareWriteStatement() {
Collection<String> cols = persistenceSettings.getTableColumns();
StringBuilder colsList = new StringBuilder();
StringBuilder questionsList = new StringBuilder();
for (String column : cols) {
if (colsList.length() != 0) {
colsList.append(", ");
questionsList.append(",");
}
colsList.append("\"").append(column).append("\"");
questionsList.append("?");
}
String statement = "insert into \"" + persistenceSettings.getKeyspace() + "\".\"%1$s" +
"\" (" + colsList + ") values (" + questionsList + ")";
if (persistenceSettings.getTTL() != null)
statement += " using ttl " + persistenceSettings.getTTL();
return statement + ";";
}
/**
* Service method to prepare CQL delete statement.
*
* @return CQL write statement.
*/
private String prepareDeleteStatement() {
Collection<String> cols = persistenceSettings.getKeyPersistenceSettings().getTableColumns();
StringBuilder statement = new StringBuilder();
for (String column : cols) {
if (statement.length() != 0)
statement.append(" and ");
statement.append("\"").append(column).append("\"=?");
}
statement.append(";");
return "delete from \"" + persistenceSettings.getKeyspace() + "\".\"%1$s\" where " + statement;
}
/**
* Service method to prepare CQL load statements including and excluding key columns.
*
* @return array having two CQL statements (including and excluding key columns).
*/
private String[] prepareLoadStatements() {
PersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings();
boolean pojoStrategy = PersistenceStrategy.POJO == settings.getStrategy();
Collection<String> keyCols = settings.getTableColumns();
StringBuilder hdrWithKeyFields = new StringBuilder();
for (String column : keyCols) {
// omit calculated fields in load statement
if (pojoStrategy && settings.getFieldByColumn(column).calculatedField())
continue;
if (hdrWithKeyFields.length() > 0)
hdrWithKeyFields.append(", ");
hdrWithKeyFields.append("\"").append(column).append("\"");
}
settings = persistenceSettings.getValuePersistenceSettings();
pojoStrategy = PersistenceStrategy.POJO == settings.getStrategy();
Collection<String> valCols = settings.getTableColumns();
StringBuilder hdr = new StringBuilder();
for (String column : valCols) {
// omit calculated fields in load statement
if (pojoStrategy && settings.getFieldByColumn(column).calculatedField())
continue;
if (hdr.length() > 0)
hdr.append(", ");
hdr.append("\"").append(column).append("\"");
if (!keyCols.contains(column))
hdrWithKeyFields.append(", \"").append(column).append("\"");
}
hdrWithKeyFields.insert(0, "select ");
hdr.insert(0, "select ");
StringBuilder statement = new StringBuilder();
statement.append(" from \"");
statement.append(persistenceSettings.getKeyspace());
statement.append("\".\"%1$s");
statement.append("\" where ");
int i = 0;
for (String column : keyCols) {
if (i > 0)
statement.append(" and ");
statement.append("\"").append(column).append("\"=?");
i++;
}
statement.append(";");
return new String[] {hdrWithKeyFields + statement.toString(), hdr + statement.toString()};
}
/**
* @param table Table.
* @param template Template.
* @param statements Statements.
* @return Statement.
*/
private String getStatement(final String table, final String template, final Map<String, String> statements) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (statements) {
String st = statements.get(table);
if (st == null) {
st = String.format(template, table);
statements.put(table, st);
}
return st;
}
}
/**
* Builds object from Cassandra table row.
*
* @param row Cassandra table row.
* @param settings persistence settings to use.
*
* @return object.
*/
private Object buildObject(Row row, PersistenceSettings settings) {
if (row == null)
return null;
PersistenceStrategy stg = settings.getStrategy();
Class clazz = settings.getJavaClass();
String col = settings.getColumn();
if (PersistenceStrategy.PRIMITIVE == stg)
return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null);
if (PersistenceStrategy.BLOB == stg)
return settings.getSerializer().deserialize(row.getBytes(col));
List<PojoField> fields = settings.getFields();
Object obj;
try {
obj = clazz.newInstance();
}
catch (Throwable e) {
throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e);
}
for (PojoField field : fields) {
if (!field.calculatedField())
field.setValueFromRow(row, obj, settings.getSerializer());
}
return obj;
}
/**
* Extracts field values from POJO object, converts into Java types
* which could be mapped to Cassandra types and stores them inside provided values
* array starting from specified offset.
*
* @param stgy Persistence strategy to use.
* @param serializer Serializer to use for BLOBs.
* @param fields Fields who's values should be extracted.
* @param obj Object instance who's field values should be extracted.
* @param values Array to store values.
* @param offset Offset starting from which to store fields values in the provided values array.
*
* @return next offset
*/
private int bindValues(PersistenceStrategy stgy, Serializer serializer, List<? extends PojoField> fields, Object obj,
Object[] values, int offset) {
if (PersistenceStrategy.PRIMITIVE == stgy) {
if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null ||
obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) {
throw new IllegalArgumentException("Couldn't deserialize instance of class '" +
obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case.");
}
values[offset] = obj;
return ++offset;
}
if (PersistenceStrategy.BLOB == stgy) {
values[offset] = serializer.serialize(obj);
return ++offset;
}
if (fields == null || fields.isEmpty())
return offset;
for (PojoField field : fields) {
Object val = field.getValueFromObject(obj, serializer);
if (val instanceof byte[])
val = ByteBuffer.wrap((byte[]) val);
values[offset] = val;
offset++;
}
return offset;
}
}