blob: e512d275a7445050c97a12a0105f3215de947590 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.spark.common.schema.ColumnType;
import org.apache.cassandra.spark.common.schema.ColumnTypes;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.ASCII;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BIGINT;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BLOB;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BOOLEAN;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DOUBLE;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TEXT;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMESTAMP;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMEUUID;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.UUID;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR;
/**
* An implementation of the {@link TableInfoProvider} interface that leverages the {@link CqlTable} to
* provide table information
*/
public class CqlTableInfoProvider implements TableInfoProvider
{
// CHECKSTYLE IGNORE: Long line
private static final Pattern DEPRECATED_OPTIONS = Pattern.compile("(?:\\s+AND)?\\s+(?:dclocal_)?read_repair_chance\\s*=\\s*[+\\-.\\dE]+", Pattern.CASE_INSENSITIVE);
private static final Pattern LEADING_CONJUNCTION = Pattern.compile("(?<=\\bWITH)\\s+AND\\b", Pattern.CASE_INSENSITIVE);
private final String createTableStatement;
private final CqlTable cqlTable;
/**
* Mapping from CQL Type Names to Java types
*/
private static final Map<String, ColumnType<?>> DATA_TYPES = ImmutableMap.<String, ColumnType<?>>builder()
.put(BIGINT, ColumnTypes.LONG)
.put(BLOB, ColumnTypes.BYTES)
.put(DOUBLE, ColumnTypes.DOUBLE)
.put(INT, ColumnTypes.INT)
.put(BOOLEAN, ColumnTypes.BOOLEAN)
.put(TEXT, ColumnTypes.STRING)
.put(TIMESTAMP, ColumnTypes.LONG)
.put(UUID, ColumnTypes.UUID)
.put(VARCHAR, ColumnTypes.STRING)
.put(ASCII, ColumnTypes.STRING)
.put(TIMEUUID, ColumnTypes.UUID)
.build();
public CqlTableInfoProvider(String createTableStatement, CqlTable cqlTable)
{
this.createTableStatement = createTableStatement;
this.cqlTable = cqlTable;
}
@Override
public CqlField.CqlType getColumnType(String columnName)
{
CqlField cqlField = cqlTable.column(columnName);
if (cqlField == null)
{
throw new IllegalArgumentException("Unknown fields in data frame => " + columnName);
}
else
{
return cqlField.type();
}
}
@Override
public List<ColumnType<?>> getPartitionKeyTypes()
{
return cqlTable.partitionKeys().stream()
.map(cqlField -> DATA_TYPES.get(cqlField.type().cqlName().toLowerCase()))
.collect(Collectors.toList());
}
@Override
public boolean columnExists(String columnName)
{
return cqlTable.column(columnName) != null;
}
@Override
public List<String> getPartitionKeyColumnNames()
{
return cqlTable.partitionKeys().stream()
.map(CqlField::name)
.collect(Collectors.toList());
}
@Override
public String getCreateStatement()
{
return removeDeprecatedOptions(createTableStatement);
}
@Override
public List<String> getPrimaryKeyColumnNames()
{
return cqlTable.primaryKey().stream()
.map(CqlField::name)
.collect(Collectors.toList());
}
@Override
public String getName()
{
return cqlTable.table();
}
@Override
public String getKeyspaceName()
{
return cqlTable.keyspace();
}
@Override
public boolean hasSecondaryIndex()
{
return cqlTable.indexCount() > 0;
}
@Override
public List<String> getColumnNames()
{
return cqlTable.columns().stream()
.map(CqlField::name)
.collect(Collectors.toList());
}
/**
* Removes table options {@code read_repair_chance} and {@code dclocal_read_repair_chance}
* that were deprecated in Cassandra 4.0 from the table creation CQL statement
*
* @param cql table creation CQL statement
* @return table creation CQL statement with deprecated table options removed
*/
@VisibleForTesting
@NotNull
static String removeDeprecatedOptions(@NotNull String cql)
{
cql = DEPRECATED_OPTIONS.matcher(cql).replaceAll("");
cql = LEADING_CONJUNCTION.matcher(cql).replaceAll("");
return cql;
}
}