blob: 713179c03541af451f758bac73aac2f61889331e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.cassandra;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
/**
* Used to generate CQL strings that can be used to generate prepared statements.
*
* @since 3.6.0
*/
public class CassandraPreparedStatementGenerator
{
private Set<String> pkColumnNames;
private Set<String> counterColumns;
private Set<String> listColumns;
private Set<String> mapColumns;
private Set<String> setColumns;
private Map<String, DataType> columnDefinitions;
private static final transient Logger LOG = LoggerFactory.getLogger(CassandraPreparedStatementGenerator.class);
public static final String TTL_PARAM_NAME = "ttl";
public CassandraPreparedStatementGenerator(Set<String> pkColumnNames, Set<String> counterColumns,
Set<String> listColumns, Set<String> mapColumns, Set<String> setColumns,
Map<String, DataType> columnDefinitions)
{
this.pkColumnNames = pkColumnNames;
this.counterColumns = counterColumns;
this.listColumns = listColumns;
this.mapColumns = mapColumns;
this.setColumns = setColumns;
this.columnDefinitions = columnDefinitions;
}
public void generatePreparedStatements(Session session,Map<Long, PreparedStatement> preparedStatementTypes,
String keyspaceName,String tableName)
{
Map<Long, String> stringsWithoutPKAndExistsClauses = generatePreparedStatementsQueryStrings(keyspaceName,tableName);
String ifExistsClause = " IF EXISTS";
Map<Long, String> finalSetOfQueryStrings = new HashMap<>();
for (Long currentIndexPos : stringsWithoutPKAndExistsClauses.keySet()) {
StringBuilder aQueryStub = new StringBuilder(stringsWithoutPKAndExistsClauses.get(currentIndexPos));
buildWhereClauseForPrimaryKeys(aQueryStub);
finalSetOfQueryStrings.put(currentIndexPos +
getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_ABSENT)),
aQueryStub.toString());
if (counterColumns.size() == 0) {
// IF exists cannot be used for counter column tables
finalSetOfQueryStrings.put(currentIndexPos +
getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_PRESENT)),
aQueryStub.toString() + ifExistsClause);
}
}
for (Long currentIndexPos : finalSetOfQueryStrings.keySet()) {
String currentQueryStr = finalSetOfQueryStrings.get(currentIndexPos);
LOG.info("Registering query support for " + currentQueryStr);
PreparedStatement preparedStatementForThisQuery = session.prepare(currentQueryStr);
preparedStatementTypes.put(currentIndexPos, preparedStatementForThisQuery);
}
}
private void buildWhereClauseForPrimaryKeys(final StringBuilder queryExpression)
{
queryExpression.append(" WHERE ");
int count = 0;
for (String pkColName : pkColumnNames) {
if (count > 0) {
queryExpression.append(" AND ");
}
count += 1;
queryExpression.append(" ").append(pkColName).append(" = :").append(pkColName);
}
}
private void buildQueryStringForTTLSetCollectionsAppendAndListPrepend(StringBuilder updateQueryRoot,
String ttlSetString, Map<Long,String> queryStrings)
{
// TTL set , Collections Append , List prepend
StringBuilder queryExpTTLSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString());
queryExpTTLSetCollAppendListPrepend.append(ttlSetString);
buildNonPKColumnsExpression(queryExpTTLSetCollAppendListPrepend,
UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST,
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND
)), queryExpTTLSetCollAppendListPrepend.toString());
}
private void buildQueryStringForTTLSetCollectionsAppendAndListAppend(StringBuilder updateQueryRoot,
String ttlSetString,Map<Long,String> queryStrings)
{
// TTL set , Collections Append , List append
StringBuilder queryExpTTLSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString());
queryExpTTLSetCollAppendListAppend.append(ttlSetString);
buildNonPKColumnsExpression(queryExpTTLSetCollAppendListAppend,
UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST,
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
)), queryExpTTLSetCollAppendListAppend.toString());
}
private void buildQueryStringForTTLSetCollectionsRemove(StringBuilder updateQueryRoot,
String ttlSetString,Map<Long,String> queryStrings)
{
// TTL set , Collections Remove
StringBuilder queryExpTTLSetCollRemove = new StringBuilder(updateQueryRoot.toString());
queryExpTTLSetCollRemove.append(ttlSetString);
buildNonPKColumnsExpression(queryExpTTLSetCollRemove,
UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it
UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE,
AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
)), queryExpTTLSetCollRemove.toString());
}
private void buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(StringBuilder updateQueryRoot,
Map<Long,String> queryStrings)
{
// TTL Not set , Collections Append , List prepend
StringBuilder queryExpTTLNotSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString());
queryExpTTLNotSetCollAppendListPrepend.append(" SET ");
buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListPrepend,
UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST,
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND
)), queryExpTTLNotSetCollAppendListPrepend.toString());
}
private void buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(StringBuilder updateQueryRoot,
Map<Long,String> queryStrings)
{
// TTL Not set , Collections Append , List append
StringBuilder queryExpTTLNotSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString());
queryExpTTLNotSetCollAppendListAppend.append(" SET ");
buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListAppend,
UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST,
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND,
AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
)), queryExpTTLNotSetCollAppendListAppend.toString());
}
private void buildQueryStringForTTLNotSetCollectionsRemove(StringBuilder updateQueryRoot,
Map<Long,String> queryStrings)
{
// TTL Not set , Collections Remove
StringBuilder queryExpTTLNotSetCollRemove = new StringBuilder(updateQueryRoot.toString());
queryExpTTLNotSetCollRemove.append(" SET ");
buildNonPKColumnsExpression(queryExpTTLNotSetCollRemove,
UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it
UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of(
AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET,
AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE,
AbstractUpsertOutputOperator.OperationContext.LIST_APPEND
)), queryExpTTLNotSetCollRemove.toString());
}
private Map<Long, String> generatePreparedStatementsQueryStrings(String keyspaceName,String tableName)
{
Map<Long, String> queryStrings = new HashMap<>();
//UPDATE keyspace_name.table_name USING option AND option SET assignment, assignment, ... WHERE row_specification
StringBuilder updateQueryRoot = new StringBuilder(" UPDATE " + keyspaceName +
"." + tableName + " ");
String ttlSetString = " USING ttl :" + TTL_PARAM_NAME + " SET ";
buildQueryStringForTTLSetCollectionsAppendAndListPrepend(updateQueryRoot,ttlSetString,queryStrings);
buildQueryStringForTTLSetCollectionsAppendAndListAppend(updateQueryRoot,ttlSetString,queryStrings);
buildQueryStringForTTLSetCollectionsRemove(updateQueryRoot,ttlSetString,queryStrings);
buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(updateQueryRoot,queryStrings);
buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(updateQueryRoot,queryStrings);
buildQueryStringForTTLNotSetCollectionsRemove(updateQueryRoot,queryStrings);
return queryStrings;
}
public static long getSlotIndexForMutationContextPreparedStatement(
final EnumSet<AbstractUpsertOutputOperator.OperationContext> context)
{
Iterator<AbstractUpsertOutputOperator.OperationContext> itrForContexts = context.iterator();
long indexValue = 0;
while (itrForContexts.hasNext()) {
AbstractUpsertOutputOperator.OperationContext aContext = itrForContexts.next();
indexValue += Math.pow(10, aContext.ordinal());
}
return indexValue;
}
private void buildNonPKColumnsExpression(final StringBuilder queryExpression,
UpsertExecutionContext.ListPlacementStyle listPlacementStyle,
UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle)
{
int count = 0;
for (String colNameEntry : columnDefinitions.keySet()) {
if (pkColumnNames.contains(colNameEntry)) {
continue;
}
if (count > 0) {
queryExpression.append(",");
}
count += 1;
if (counterColumns.contains(colNameEntry)) {
queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
continue;
}
DataType dataType = columnDefinitions.get(colNameEntry);
if ((!dataType.isCollection()) && (!counterColumns.contains(colNameEntry))) {
queryExpression.append(" " + colNameEntry + " = :" + colNameEntry);
continue;
}
if ((dataType.isCollection()) && (!dataType.isFrozen())) {
if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION) {
queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " - :" + colNameEntry);
}
if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION) {
if ((setColumns.contains(colNameEntry)) || (mapColumns.contains(colNameEntry))) {
queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
}
if ((listColumns.contains(colNameEntry)) &&
(listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST)) {
queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry);
}
if ((listColumns.contains(colNameEntry)) &&
(listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST)) {
queryExpression.append(" " + colNameEntry + " = :" + colNameEntry + " + " + colNameEntry);
}
}
} else {
if ((dataType.isCollection()) && (dataType.isFrozen())) {
queryExpression.append(" " + colNameEntry + " = :" + colNameEntry);
}
}
}
}
}