| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache.store.cassandra.persistence; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.Serializable; |
| import java.io.StringReader; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import javax.xml.parsers.DocumentBuilder; |
| import javax.xml.parsers.DocumentBuilderFactory; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; |
| import org.apache.ignite.cache.store.cassandra.common.SystemHelper; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.springframework.core.io.Resource; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| import org.w3c.dom.NodeList; |
| import org.xml.sax.InputSource; |
| |
| /** |
| * Stores persistence settings for Ignite cache key and value |
| */ |
| public class KeyValuePersistenceSettings implements Serializable { |
| /** |
| * Default Cassandra keyspace options which should be used to create new keyspace. |
| * <ul> |
| * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/> |
| * If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>. |
| * </li> |
| * <li> Three replicas will be created for each data block. </li> |
| * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li> |
| * </ul> |
| */ |
| private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " + |
| "'replication_factor' : 3} and durable_writes = true"; |
| |
| /** Xml attribute specifying Cassandra keyspace to use. */ |
| private static final String KEYSPACE_ATTR = "keyspace"; |
| |
| /** Xml attribute specifying Cassandra table to use. */ |
| private static final String TABLE_ATTR = "table"; |
| |
| /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */ |
| private static final String TTL_ATTR = "ttl"; |
| |
| /** Root xml element containing persistence settings specification. */ |
| private static final String PERSISTENCE_NODE = "persistence"; |
| |
| /** Xml element specifying Cassandra keyspace options. */ |
| private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions"; |
| |
| /** Xml element specifying Cassandra table options. */ |
| private static final String TABLE_OPTIONS_NODE = "tableOptions"; |
| |
| /** Xml element specifying Ignite cache key persistence settings. */ |
| private static final String KEY_PERSISTENCE_NODE = "keyPersistence"; |
| |
| /** Xml element specifying Ignite cache value persistence settings. */ |
| private static final String VALUE_PERSISTENCE_NODE = "valuePersistence"; |
| |
| /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */ |
| private Integer ttl; |
| |
| /** Cassandra keyspace (analog of tablespace in relational databases). */ |
| private String keyspace; |
| |
| /** Cassandra table. */ |
| private String tbl; |
| |
| /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */ |
| private String tblOptions; |
| |
| /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */ |
| private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS; |
| |
| /** Persistence settings for Ignite cache keys. */ |
| private KeyPersistenceSettings keyPersistenceSettings; |
| |
| /** Persistence settings for Ignite cache values. */ |
| private ValuePersistenceSettings valPersistenceSettings; |
| |
| /** List of Cassandra table columns */ |
| private List<String> tableColumns; |
| |
| /** |
| * Constructs Ignite cache key/value persistence settings. |
| * |
| * @param settings string containing xml with persistence settings for Ignite cache key/value |
| */ |
| public KeyValuePersistenceSettings(String settings) { |
| init(settings); |
| } |
| |
| /** |
| * Constructs Ignite cache key/value persistence settings. |
| * |
| * @param settingsFile xml file with persistence settings for Ignite cache key/value |
| */ |
| public KeyValuePersistenceSettings(File settingsFile) { |
| InputStream in; |
| |
| try { |
| in = new FileInputStream(settingsFile); |
| } |
| catch (IOException e) { |
| throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " + |
| settingsFile.getAbsolutePath(), e); |
| } |
| |
| init(loadSettings(in)); |
| } |
| |
| /** |
| * Constructs Ignite cache key/value persistence settings. |
| * |
| * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value |
| */ |
| public KeyValuePersistenceSettings(Resource settingsRsrc) { |
| InputStream in; |
| |
| try { |
| in = settingsRsrc.getInputStream(); |
| } |
| catch (IOException e) { |
| throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e); |
| } |
| |
| init(loadSettings(in)); |
| } |
| |
| /** |
| * Returns ttl to use for while inserting new rows into Cassandra table. |
| * |
| * @return ttl |
| */ |
| public Integer getTTL() { |
| return ttl; |
| } |
| |
| /** |
| * Returns Cassandra keyspace to use. |
| * |
| * @return keyspace. |
| */ |
| public String getKeyspace() { |
| return keyspace; |
| } |
| |
| /** |
| * Returns Cassandra table to use. |
| * |
| * @return table. |
| */ |
| public String getTable() { |
| return tbl; |
| } |
| |
| /** |
| * Returns persistence settings for Ignite cache keys. |
| * |
| * @return keys persistence settings. |
| */ |
| public KeyPersistenceSettings getKeyPersistenceSettings() { |
| return keyPersistenceSettings; |
| } |
| |
| /** |
| * Returns persistence settings for Ignite cache values. |
| * |
| * @return values persistence settings. |
| */ |
| public ValuePersistenceSettings getValuePersistenceSettings() { |
| return valPersistenceSettings; |
| } |
| |
| /** |
| * Returns list of POJO fields to be mapped to Cassandra table columns. |
| * |
| * @return POJO fields list. |
| */ |
| public List<PojoField> getFields() { |
| List<PojoField> fields = new LinkedList<>(); |
| |
| for (PojoField field : keyPersistenceSettings.getFields()) |
| fields.add(field); |
| |
| for (PojoField field : valPersistenceSettings.getFields()) |
| fields.add(field); |
| |
| return fields; |
| } |
| |
| /** |
| * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns. |
| * |
| * @return POJO fields list. |
| */ |
| public List<PojoKeyField> getKeyFields() { |
| return keyPersistenceSettings.getFields(); |
| } |
| |
| /** |
| * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns. |
| * |
| * @return POJO fields list. |
| */ |
| public List<PojoValueField> getValueFields() { |
| return valPersistenceSettings.getFields(); |
| } |
| |
| /** |
| * Returns DDL statement to create Cassandra keyspace. |
| * |
| * @return Keyspace DDL statement. |
| */ |
| public String getKeyspaceDDLStatement() { |
| StringBuilder builder = new StringBuilder(); |
| builder.append("create keyspace if not exists \"").append(keyspace).append("\""); |
| |
| if (keyspaceOptions != null) { |
| if (!keyspaceOptions.trim().toLowerCase().startsWith("with")) |
| builder.append("\nwith"); |
| |
| builder.append(" ").append(keyspaceOptions); |
| } |
| |
| String statement = builder.toString().trim().replaceAll(" +", " "); |
| |
| return statement.endsWith(";") ? statement : statement + ";"; |
| } |
| |
| /** |
| * Returns column names for Cassandra table. |
| * |
| * @return Column names. |
| */ |
| public List<String> getTableColumns() { |
| return tableColumns; |
| } |
| |
| /** |
| * Returns DDL statement to create Cassandra table. |
| * |
| * @param table Table name. |
| * @return Table DDL statement. |
| */ |
| public String getTableDDLStatement(String table) { |
| if (table == null || table.trim().isEmpty()) |
| throw new IllegalArgumentException("Table name should be specified"); |
| |
| String keyColumnsDDL = keyPersistenceSettings.getTableColumnsDDL(); |
| String valColumnsDDL = valPersistenceSettings.getTableColumnsDDL(new HashSet<>(keyPersistenceSettings.getTableColumns())); |
| |
| String colsDDL = keyColumnsDDL; |
| |
| if (valColumnsDDL != null && !valColumnsDDL.trim().isEmpty()) |
| colsDDL += ",\n" + valColumnsDDL; |
| |
| String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL(); |
| |
| String clusteringDDL = keyPersistenceSettings.getClusteringDDL(); |
| |
| String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : ""; |
| |
| if (clusteringDDL != null && !clusteringDDL.isEmpty()) |
| optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL; |
| |
| if (!optionsDDL.trim().isEmpty()) |
| optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim(); |
| |
| StringBuilder builder = new StringBuilder(); |
| |
| builder.append("create table if not exists \"").append(keyspace).append("\".\"").append(table).append("\""); |
| builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)"); |
| |
| if (!optionsDDL.isEmpty()) |
| builder.append(" \n").append(optionsDDL); |
| |
| String tblDDL = builder.toString().trim().replaceAll(" +", " "); |
| |
| return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";"; |
| } |
| |
| /** |
| * Returns DDL statements to create Cassandra table secondary indexes. |
| * |
| * @param table Table name. |
| * @return DDL statements to create secondary indexes. |
| */ |
| public List<String> getIndexDDLStatements(String table) { |
| List<String> idxDDLs = new LinkedList<>(); |
| |
| Set<String> keyCols = new HashSet<>(keyPersistenceSettings.getTableColumns()); |
| |
| List<PojoValueField> fields = valPersistenceSettings.getFields(); |
| |
| for (PojoField field : fields) { |
| if (!keyCols.contains(field.getColumn()) && ((PojoValueField)field).isIndexed()) |
| idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, table)); |
| } |
| |
| return idxDDLs; |
| } |
| |
| /** |
| * Loads Ignite cache persistence settings from resource. |
| * |
| * @param in Input stream. |
| * @return String containing xml with Ignite cache persistence settings. |
| */ |
| private String loadSettings(InputStream in) { |
| StringBuilder settings = new StringBuilder(); |
| BufferedReader reader = null; |
| |
| try { |
| reader = new BufferedReader(new InputStreamReader(in)); |
| |
| String line = reader.readLine(); |
| |
| while (line != null) { |
| if (settings.length() != 0) |
| settings.append(SystemHelper.LINE_SEPARATOR); |
| |
| settings.append(line); |
| |
| line = reader.readLine(); |
| } |
| } |
| catch (Throwable e) { |
| throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e); |
| } |
| finally { |
| U.closeQuiet(reader); |
| U.closeQuiet(in); |
| } |
| |
| return settings.toString(); |
| } |
| |
| /** |
| * @param elem Element with data. |
| * @param attr Attribute name. |
| * @return Numeric value for specified attribute. |
| */ |
| private int extractIntAttribute(Element elem, String attr) { |
| String val = elem.getAttribute(attr).trim(); |
| |
| try { |
| return Integer.parseInt(val); |
| } |
| catch (NumberFormatException ignored) { |
| throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute"); |
| } |
| } |
| |
| /** |
| * Initializes persistence settings from XML string. |
| * |
| * @param settings XML string containing Ignite cache persistence settings configuration. |
| */ |
| @SuppressWarnings("IfCanBeSwitch") |
| private void init(String settings) { |
| Document doc; |
| |
| try { |
| DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); |
| DocumentBuilder builder = factory.newDocumentBuilder(); |
| doc = builder.parse(new InputSource(new StringReader(settings))); |
| } |
| catch (Throwable e) { |
| throw new IllegalArgumentException("Failed to parse persistence settings:" + |
| SystemHelper.LINE_SEPARATOR + settings, e); |
| } |
| |
| Element root = doc.getDocumentElement(); |
| |
| if (!PERSISTENCE_NODE.equals(root.getNodeName())) { |
| throw new IllegalArgumentException("Incorrect persistence settings specified. " + |
| "Root XML element should be 'persistence'"); |
| } |
| |
| if (!root.hasAttribute(KEYSPACE_ATTR)) { |
| throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR + |
| "' attribute should be specified"); |
| } |
| |
| keyspace = root.getAttribute(KEYSPACE_ATTR).trim(); |
| tbl = root.hasAttribute(TABLE_ATTR) ? root.getAttribute(TABLE_ATTR).trim() : null; |
| |
| if (root.hasAttribute(TTL_ATTR)) |
| ttl = extractIntAttribute(root, TTL_ATTR); |
| |
| if (!root.hasChildNodes()) { |
| throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + |
| "there are no key and value persistence settings specified"); |
| } |
| |
| NodeList children = root.getChildNodes(); |
| int cnt = children.getLength(); |
| |
| for (int i = 0; i < cnt; i++) { |
| Node node = children.item(i); |
| |
| if (node.getNodeType() != Node.ELEMENT_NODE) |
| continue; |
| |
| Element el = (Element)node; |
| String nodeName = el.getNodeName(); |
| |
| if (nodeName.equals(TABLE_OPTIONS_NODE)) { |
| tblOptions = el.getTextContent(); |
| tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); |
| } |
| else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) { |
| keyspaceOptions = el.getTextContent(); |
| keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " "); |
| } |
| else if (nodeName.equals(KEY_PERSISTENCE_NODE)) |
| keyPersistenceSettings = new KeyPersistenceSettings(el); |
| else if (nodeName.equals(VALUE_PERSISTENCE_NODE)) |
| valPersistenceSettings = new ValuePersistenceSettings(el); |
| } |
| |
| if (keyPersistenceSettings == null) { |
| throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + |
| "there are no key persistence settings specified"); |
| } |
| |
| if (valPersistenceSettings == null) { |
| throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + |
| "there are no value persistence settings specified"); |
| } |
| |
| List<PojoKeyField> keyFields = keyPersistenceSettings.getFields(); |
| List<PojoValueField> valFields = valPersistenceSettings.getFields(); |
| |
| if (PersistenceStrategy.POJO == keyPersistenceSettings.getStrategy() && |
| (keyFields == null || keyFields.isEmpty())) { |
| throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + |
| "there are no key fields found"); |
| } |
| |
| if (PersistenceStrategy.POJO == valPersistenceSettings.getStrategy() && |
| (valFields == null || valFields.isEmpty())) { |
| throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " + |
| "there are no value fields found"); |
| } |
| |
| // Validating aliases compatibility - fields having different names, but mapped to the same Cassandra table column. |
| if (valFields != null && !valFields.isEmpty()) { |
| String keyColumn = keyPersistenceSettings.getColumn(); |
| Class keyClass = keyPersistenceSettings.getJavaClass(); |
| |
| if (keyColumn != null && !keyColumn.isEmpty()) { |
| for (PojoField valField : valFields) { |
| if (keyColumn.equals(valField.getColumn()) && |
| !CassandraHelper.isCassandraCompatibleTypes(keyClass, valField.getJavaClass())) { |
| throw new IllegalArgumentException("Value field '" + valField.getName() + "' shares the same " + |
| "Cassandra table column '" + keyColumn + "' with key, but their Java classes are " + |
| "different. Fields sharing the same column should have the same Java class as their " + |
| "type or should be mapped to the same Cassandra primitive type."); |
| } |
| } |
| } |
| |
| if (keyFields != null && !keyFields.isEmpty()) { |
| for (PojoField keyField : keyFields) { |
| for (PojoField valField : valFields) { |
| if (keyField.getColumn().equals(valField.getColumn()) && |
| !CassandraHelper.isCassandraCompatibleTypes(keyField.getJavaClass(), valField.getJavaClass())) { |
| throw new IllegalArgumentException("Value field '" + valField.getName() + "' shares the same " + |
| "Cassandra table column '" + keyColumn + "' with key field '" + keyField.getName() + "', " + |
| "but their Java classes are different. Fields sharing the same column should have " + |
| "the same Java class as their type or should be mapped to the same Cassandra " + |
| "primitive type."); |
| } |
| } |
| } |
| } |
| } |
| |
| tableColumns = new LinkedList<>(); |
| |
| for (String column : keyPersistenceSettings.getTableColumns()) { |
| if (!tableColumns.contains(column)) |
| tableColumns.add(column); |
| } |
| |
| for (String column : valPersistenceSettings.getTableColumns()) { |
| if (!tableColumns.contains(column)) |
| tableColumns.add(column); |
| } |
| |
| tableColumns = Collections.unmodifiableList(tableColumns); |
| } |
| } |