| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.reader; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.antlr.runtime.RecognitionException; |
| import org.apache.cassandra.bridge.CassandraBridge; |
| import org.apache.cassandra.bridge.CassandraBridgeImplementation; |
| import org.apache.cassandra.bridge.CassandraSchema; |
| import org.apache.cassandra.cql3.CQL3Type; |
| import org.apache.cassandra.cql3.CQLFragmentParser; |
| import org.apache.cassandra.cql3.CqlParser; |
| import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.CollectionType; |
| import org.apache.cassandra.db.marshal.ListType; |
| import org.apache.cassandra.db.marshal.MapType; |
| import org.apache.cassandra.db.marshal.SetType; |
| import org.apache.cassandra.db.marshal.TupleType; |
| import org.apache.cassandra.db.marshal.UserType; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.schema.ColumnMetadata; |
| import org.apache.cassandra.schema.KeyspaceMetadata; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.schema.TableMetadataRef; |
| import org.apache.cassandra.schema.Types; |
| import org.apache.cassandra.spark.data.CqlField; |
| import org.apache.cassandra.spark.data.CqlTable; |
| import org.apache.cassandra.spark.data.ReplicationFactor; |
| import org.apache.cassandra.spark.data.complex.CqlFrozen; |
| import org.apache.cassandra.spark.data.complex.CqlUdt; |
| import org.apache.cassandra.spark.data.partitioner.Partitioner; |
| import org.apache.cassandra.utils.Pair; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| public class SchemaBuilder |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(SchemaBuilder.class); |
| |
| private final TableMetadata metadata; |
| private final KeyspaceMetadata keyspaceMetadata; |
| private final String createStmt; |
| private final String keyspace; |
| private final ReplicationFactor replicationFactor; |
| private final CassandraBridge bridge; |
| private final int indexCount; |
| |
| public SchemaBuilder(CqlTable table, Partitioner partitioner) |
| { |
| this(table, partitioner, null); |
| } |
| |
| public SchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId) |
| { |
| this(table.createStatement(), |
| table.keyspace(), |
| table.replicationFactor(), |
| partitioner, |
| table.udtCreateStmts(), |
| tableId, |
| 0); |
| } |
| |
| @VisibleForTesting |
| public SchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) |
| { |
| this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, Collections.emptySet(), null, 0); |
| } |
| |
| @VisibleForTesting |
| public SchemaBuilder(String createStmt, |
| String keyspace, |
| ReplicationFactor replicationFactor, |
| Partitioner partitioner) |
| { |
| this(createStmt, keyspace, replicationFactor, partitioner, Collections.emptySet(), null, 0); |
| } |
| |
| public SchemaBuilder(String createStmt, |
| String keyspace, |
| ReplicationFactor replicationFactor, |
| Partitioner partitioner, |
| Set<String> udtStmts, |
| @Nullable UUID tableId, |
| int indexCount) |
| { |
| this.createStmt = createStmt; |
| this.keyspace = keyspace; |
| this.replicationFactor = replicationFactor; |
| this.bridge = new CassandraBridgeImplementation(); |
| this.indexCount = indexCount; |
| |
| Pair<KeyspaceMetadata, TableMetadata> updated = CassandraSchema.apply(schema -> |
| updateSchema(schema, |
| this.keyspace, |
| udtStmts, |
| this.createStmt, |
| partitioner, |
| this.replicationFactor, |
| tableId, |
| this::validateColumnMetaData)); |
| this.keyspaceMetadata = updated.left; |
| this.metadata = updated.right; |
| } |
| |
| // Update schema with the given keyspace, table and udt. |
| // It creates the cooresponding metadata and opens instances for keyspace and table, if needed. |
| // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. |
| private static Pair<KeyspaceMetadata, TableMetadata> updateSchema(Schema schema, |
| String keyspace, |
| Set<String> udtStatements, |
| String createStatement, |
| Partitioner partitioner, |
| ReplicationFactor replicationFactor, |
| UUID tableId, |
| Consumer<ColumnMetadata> columnValidator) |
| { |
| // Set up and open keyspace if needed |
| IPartitioner cassPartitioner = CassandraBridgeImplementation.getPartitioner(partitioner); |
| setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner); |
| |
| // Set up and open table if needed, parse UDTs and include when parsing table schema |
| List<CreateTypeStatement.Raw> typeStatements = new ArrayList<>(udtStatements.size()); |
| for (String udt : udtStatements) |
| { |
| try |
| { |
| typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser |
| .parseAnyUnhandled(CqlParser::query, udt)); |
| } |
| catch (RecognitionException exception) |
| { |
| LOGGER.error("Failed to parse type expression '{}'", udt); |
| throw new IllegalStateException(exception); |
| } |
| } |
| Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace); |
| for (CreateTypeStatement.Raw st : typeStatements) |
| { |
| st.addToRawBuilder(typesBuilder); |
| } |
| Types types = typesBuilder.build(); |
| TableMetadata.Builder builder = CQLFragmentParser |
| .parseAny(CqlParser::createTableStatement, createStatement, "CREATE TABLE") |
| .keyspace(keyspace) |
| .prepare(null) |
| .builder(types) |
| .partitioner(cassPartitioner); |
| |
| if (tableId != null) |
| { |
| builder.id(TableId.fromUUID(tableId)); |
| } |
| TableMetadata tableMetadata = builder.build(); |
| tableMetadata.columns().forEach(columnValidator); |
| setupTableAndUdt(schema, keyspace, tableMetadata, types); |
| |
| return validateKeyspaceTable(schema, keyspace, tableMetadata.name); |
| } |
| |
| private void validateColumnMetaData(@NotNull ColumnMetadata column) |
| { |
| validateType(column.type); |
| } |
| |
| private void validateType(AbstractType<?> type) |
| { |
| validateType(type.asCQL3Type()); |
| } |
| |
| private void validateType(CQL3Type cqlType) |
| { |
| if (!(cqlType instanceof CQL3Type.Native) |
| && !(cqlType instanceof CQL3Type.Collection) |
| && !(cqlType instanceof CQL3Type.UserDefined) |
| && !(cqlType instanceof CQL3Type.Tuple)) |
| { |
| throw new UnsupportedOperationException("Only native, collection, tuples or UDT data types are supported, " |
| + "unsupported data type: " + cqlType.toString()); |
| } |
| |
| if (cqlType instanceof CQL3Type.Native) |
| { |
| CqlField.CqlType type = bridge.parseType(cqlType.toString()); |
| if (!type.isSupported()) |
| { |
| throw new UnsupportedOperationException(type.name() + " data type is not supported"); |
| } |
| } |
| else if (cqlType instanceof CQL3Type.Collection) |
| { |
| // Validate collection inner types |
| CQL3Type.Collection collection = (CQL3Type.Collection) cqlType; |
| CollectionType<?> type = (CollectionType<?>) collection.getType(); |
| switch (type.kind) |
| { |
| case LIST: |
| validateType(((ListType<?>) type).getElementsType()); |
| return; |
| case SET: |
| validateType(((SetType<?>) type).getElementsType()); |
| return; |
| case MAP: |
| validateType(((MapType<?, ?>) type).getKeysType()); |
| validateType(((MapType<?, ?>) type).getValuesType()); |
| return; |
| default: |
| // Do nothing |
| } |
| } |
| else if (cqlType instanceof CQL3Type.Tuple) |
| { |
| CQL3Type.Tuple tuple = (CQL3Type.Tuple) cqlType; |
| TupleType tupleType = (TupleType) tuple.getType(); |
| for (AbstractType<?> subType : tupleType.allTypes()) |
| { |
| validateType(subType); |
| } |
| } |
| else |
| { |
| // Validate UDT inner types |
| UserType userType = (UserType) ((CQL3Type.UserDefined) cqlType).getType(); |
| for (AbstractType<?> innerType : userType.fieldTypes()) |
| { |
| validateType(innerType); |
| } |
| } |
| } |
| |
| private static boolean keyspaceMetadataExists(Schema schema, String keyspaceName) |
| { |
| return schema.getKeyspaceMetadata(keyspaceName) != null; |
| } |
| |
| private static boolean tableMetadataExists(Schema schema, String keyspaceName, String tableName) |
| { |
| KeyspaceMetadata ksMetadata = schema.getKeyspaceMetadata(keyspaceName); |
| if (ksMetadata == null) |
| { |
| return false; |
| } |
| |
| return ksMetadata.hasTable(tableName); |
| } |
| |
| private static boolean keyspaceInstanceExists(Schema schema, String keyspaceName) |
| { |
| return schema.getKeyspaceInstance(keyspaceName) != null; |
| } |
| |
| private static boolean tableInstanceExists(Schema schema, String keyspaceName, String tableName) |
| { |
| Keyspace keyspace = schema.getKeyspaceInstance(keyspaceName); |
| if (keyspace == null) |
| { |
| return false; |
| } |
| |
| try |
| { |
| keyspace.getColumnFamilyStore(tableName); |
| } |
| catch (IllegalArgumentException exception) |
| { |
| LOGGER.info("Table instance does not exist. keyspace={} table={} existingCFS={}", |
| keyspace, tableName, keyspace.getColumnFamilyStores()); |
| return false; |
| } |
| return true; |
| } |
| |
| // Check whether keyspace metadata exists. Create keyspace metadata, if not. |
| // Check whether keyspace instance is opened. Open the keyspace, if not. |
| // NOTE: It is possible that external code that just creates metadata, but does not open the keyspace |
| private static void setupKeyspace(Schema schema, |
| String keyspaceName, |
| ReplicationFactor replicationFactor, |
| IPartitioner partitioner) |
| { |
| if (!keyspaceMetadataExists(schema, keyspaceName)) |
| { |
| LOGGER.info("Setting up keyspace metadata in schema keyspace={} rfStrategy={} partitioner={}", |
| keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); |
| KeyspaceMetadata keyspaceMetadata = |
| KeyspaceMetadata.create(keyspaceName, KeyspaceParams.create(true, rfToMap(replicationFactor))); |
| schema.load(keyspaceMetadata); |
| } |
| |
| if (!keyspaceInstanceExists(schema, keyspaceName)) |
| { |
| LOGGER.info("Setting up keyspace instance in schema keyspace={} rfStrategy={} partitioner={}", |
| keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); |
| // Create keyspace instance and also initCf (cfs) for the table |
| Keyspace.openWithoutSSTables(keyspaceName); |
| } |
| } |
| |
| // Check whether table metadata exists. Create table metadata, if not. |
| // Check whether table instance is opened. Open/init the table, if not. |
| // NOTE: It is possible that external code that just creates metadata, but does not open the table |
| private static void setupTableAndUdt(Schema schema, |
| String keyspaceName, |
| TableMetadata tableMetadata, |
| Types userTypes) |
| { |
| String tableName = tableMetadata.name; |
| KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); |
| if (keyspaceMetadata == null) |
| { |
| LOGGER.error("Keyspace metadata does not exist. keyspace={}", keyspaceName); |
| throw new IllegalStateException("Keyspace metadata null for '" + keyspaceName |
| + "' when it should have been initialized already"); |
| } |
| |
| if (!tableMetadataExists(schema, keyspaceName, tableName)) |
| { |
| LOGGER.info("Setting up table metadata in schema keyspace={} table={} partitioner={}", |
| keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); |
| keyspaceMetadata = keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(tableMetadata)); |
| schema.load(keyspaceMetadata); |
| } |
| |
| // The metadata of the table might not be the input tableMetadata. Fetch the current to be safe. |
| TableMetadata currentTable = schema.getTableMetadata(keyspaceName, tableName); |
| if (!tableInstanceExists(schema, keyspaceName, tableName)) |
| { |
| LOGGER.info("Setting up table instance in schema keyspace={} table={} partitioner={}", |
| keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); |
| if (keyspaceInstanceExists(schema, keyspaceName)) |
| { |
| // initCf (cfs) in the opened keyspace |
| schema.getKeyspaceInstance(keyspaceName) |
| .initCf(TableMetadataRef.forOfflineTools(currentTable), false); |
| } |
| else |
| { |
| // The keyspace has not yet opened, create/open keyspace instance and also initCf (cfs) for the table |
| Keyspace.openWithoutSSTables(keyspaceName); |
| } |
| } |
| |
| if (!userTypes.equals(Types.none())) |
| { |
| LOGGER.info("Setting up user types in schema keyspace={} types={}", |
| keyspaceName, userTypes); |
| // Update Schema instance with any user-defined types built |
| keyspaceMetadata = keyspaceMetadata.withSwapped(userTypes); |
| schema.load(keyspaceMetadata); |
| } |
| } |
| |
| private static Pair<KeyspaceMetadata, TableMetadata> validateKeyspaceTable(Schema schema, |
| String keyspaceName, |
| String tableName) |
| { |
| Preconditions.checkState(keyspaceMetadataExists(schema, keyspaceName), |
| "Keyspace metadata does not exist after building schema. keyspace=%s", |
| keyspaceName); |
| Preconditions.checkState(keyspaceInstanceExists(schema, keyspaceName), |
| "Keyspace instance is not opened after building schema. keyspace=%s", |
| keyspaceName); |
| Preconditions.checkState(tableMetadataExists(schema, keyspaceName, tableName), |
| "Table metadata does not exist after building schema. keyspace=%s table=%s", |
| keyspaceName, tableName); |
| Preconditions.checkState(tableInstanceExists(schema, keyspaceName, tableName), |
| "Table instance is not opened after building schema. keyspace=%s table=%s", |
| keyspaceName, tableName); |
| |
| // Validated above that keyspace and table, both exist and are opened |
| KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); |
| TableMetadata tableMetadata = schema.getTableMetadata(keyspaceName, tableName); |
| return Pair.create(keyspaceMetadata, tableMetadata); |
| } |
| |
| public TableMetadata tableMetaData() |
| { |
| return metadata; |
| } |
| |
| public String createStatement() |
| { |
| return createStmt; |
| } |
| |
| public CqlTable build() |
| { |
| Map<String, CqlField.CqlUdt> udts = buildsUdts(keyspaceMetadata); |
| List<CqlField> fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); |
| return new CqlTable(keyspace, |
| metadata.name, |
| createStmt, |
| replicationFactor, |
| fields, |
| new HashSet<>(udts.values()), |
| indexCount); |
| } |
| |
| private Map<String, CqlField.CqlUdt> buildsUdts(KeyspaceMetadata keyspaceMetadata) |
| { |
| List<UserType> userTypes = new ArrayList<>(); |
| keyspaceMetadata.types.forEach(userTypes::add); |
| Map<String, CqlField.CqlUdt> udts = new HashMap<>(userTypes.size()); |
| while (!userTypes.isEmpty()) |
| { |
| UserType userType = userTypes.remove(0); |
| if (!SchemaBuilder.nestedUdts(userType).stream().allMatch(udts::containsKey)) |
| { |
| // This UDT contains a nested user-defined type that has not been parsed yet |
| // so re-add to the queue and parse later |
| userTypes.add(userType); |
| continue; |
| } |
| String name = userType.getNameAsString(); |
| CqlUdt.Builder builder = CqlUdt.builder(keyspaceMetadata.name, name); |
| for (int field = 0; field < userType.size(); field++) |
| { |
| builder.withField(userType.fieldName(field).toString(), |
| bridge.parseType(userType.fieldType(field).asCQL3Type().toString(), udts)); |
| } |
| udts.put(name, builder.build()); |
| } |
| |
| return udts; |
| } |
| |
| /** |
| * @param type an abstract type |
| * @return a set of UDTs nested within the type parameter |
| */ |
| private static Set<String> nestedUdts(AbstractType<?> type) |
| { |
| Set<String> result = new HashSet<>(); |
| nestedUdts(type, result, false); |
| return result; |
| } |
| |
| private static void nestedUdts(AbstractType<?> type, Set<String> udts, boolean isNested) |
| { |
| if (type instanceof UserType) |
| { |
| if (isNested) |
| { |
| udts.add(((UserType) type).getNameAsString()); |
| } |
| for (AbstractType<?> nestedType : ((UserType) type).fieldTypes()) |
| { |
| nestedUdts(nestedType, udts, true); |
| } |
| } |
| else if (type instanceof TupleType) |
| { |
| for (AbstractType<?> nestedType : ((TupleType) type).allTypes()) |
| { |
| nestedUdts(nestedType, udts, true); |
| } |
| } |
| else if (type instanceof SetType) |
| { |
| nestedUdts(((SetType<?>) type).getElementsType(), udts, true); |
| } |
| else if (type instanceof ListType) |
| { |
| nestedUdts(((ListType<?>) type).getElementsType(), udts, true); |
| } |
| else if (type instanceof MapType) |
| { |
| nestedUdts(((MapType<?, ?>) type).getKeysType(), udts, true); |
| nestedUdts(((MapType<?, ?>) type).getValuesType(), udts, true); |
| } |
| } |
| |
| private List<CqlField> buildFields(TableMetadata metadata, Map<String, CqlField.CqlUdt> udts) |
| { |
| Iterator<ColumnMetadata> it = metadata.allColumnsInSelectOrder(); |
| List<CqlField> result = new ArrayList<>(); |
| int position = 0; |
| while (it.hasNext()) |
| { |
| ColumnMetadata col = it.next(); |
| boolean isPartitionKey = col.isPartitionKey(); |
| boolean isClusteringColumn = col.isClusteringColumn(); |
| boolean isStatic = col.isStatic(); |
| String name = col.name.toCQLString(); |
| CqlField.CqlType type = col.type.isUDT() ? udts.get(((UserType) col.type).getNameAsString()) |
| : bridge.parseType(col.type.asCQL3Type().toString(), udts); |
| boolean isFrozen = col.type.isFreezable() && !col.type.isMultiCell(); |
| result.add(new CqlField(isPartitionKey, |
| isClusteringColumn, |
| isStatic, |
| name, |
| !(type instanceof CqlFrozen) && isFrozen ? CqlFrozen.build(type) : type, |
| position)); |
| position++; |
| } |
| return result; |
| } |
| |
| static Map<String, String> rfToMap(ReplicationFactor replicationFactor) |
| { |
| Map<String, String> result = new HashMap<>(replicationFactor.getOptions().size() + 1); |
| result.put("class", "org.apache.cassandra.locator." + replicationFactor.getReplicationStrategy().name()); |
| for (Map.Entry<String, Integer> entry : replicationFactor.getOptions().entrySet()) |
| { |
| result.put(entry.getKey(), Integer.toString(entry.getValue())); |
| } |
| return result; |
| } |
| } |