blob: ddb3ea6901fcdcdd16d11d4fca890153624c770d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.nio.ByteBuffer;
import java.net.InetSocketAddress;
import java.util.*;
import com.datastax.driver.core.*;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.schema.TableMetadata;
public class NativeSSTableLoaderClient extends SSTableLoader.Client
{
protected final Map<String, TableMetadataRef> tables;
private final Collection<InetSocketAddress> hosts;
private final int storagePort;
private final AuthProvider authProvider;
private final SSLOptions sslOptions;
public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int storagePort, String username, String password, SSLOptions sslOptions)
{
this(hosts, storagePort, new PlainTextAuthProvider(username, password), sslOptions);
}
public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int storagePort, AuthProvider authProvider, SSLOptions sslOptions)
{
super();
this.tables = new HashMap<>();
this.hosts = hosts;
this.authProvider = authProvider;
this.sslOptions = sslOptions;
this.storagePort = storagePort;
}
public void init(String keyspace)
{
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(hosts).allowBetaProtocolVersion();
if (sslOptions != null)
builder.withSSL(sslOptions);
if (authProvider != null)
builder = builder.withAuthProvider(authProvider);
try (Cluster cluster = builder.build(); Session session = cluster.connect())
{
Metadata metadata = cluster.getMetadata();
Set<TokenRange> tokenRanges = metadata.getTokenRanges();
IPartitioner partitioner = FBUtilities.newPartitioner(metadata.getPartitioner());
TokenFactory tokenFactory = partitioner.getTokenFactory();
for (TokenRange tokenRange : tokenRanges)
{
Set<Host> endpoints = metadata.getReplicas(Metadata.quote(keyspace), tokenRange);
Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
for (Host endpoint : endpoints)
{
int broadcastPort = endpoint.getBroadcastSocketAddress().getPort();
// use port from broadcast address if set.
int portToUse = broadcastPort != 0 ? broadcastPort : storagePort;
addRangeForEndpoint(range, InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(), portToUse));
}
}
Types types = fetchTypes(keyspace, session);
tables.putAll(fetchTables(keyspace, session, partitioner, types));
// We only need the TableMetadata for the views, so we only load that.
tables.putAll(fetchViews(keyspace, session, partitioner, types));
}
catch (Exception e)
{
throw new RuntimeException("Unable to initialise " + NativeSSTableLoaderClient.class.getName(), e);
}
}
public TableMetadataRef getTableMetadata(String tableName)
{
return tables.get(tableName);
}
@Override
public void setTableMetadata(TableMetadataRef cfm)
{
tables.put(cfm.name, cfm);
}
private static Types fetchTypes(String keyspace, Session session)
{
String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES);
Types.RawBuilder types = Types.rawBuilder(keyspace);
for (Row row : session.execute(query, keyspace))
{
String name = row.getString("type_name");
List<String> fieldNames = row.getList("field_names", String.class);
List<String> fieldTypes = row.getList("field_types", String.class);
types.add(name, fieldNames, fieldTypes);
}
return types.build();
}
/*
* The following is a slightly simplified but otherwise duplicated version of
* SchemaKeyspace.createTableFromTableRowAndColumnRows().
* It might be safer to have a simple wrapper of the driver ResultSet/Row implementing
* UntypedResultSet/UntypedResultSet.Row and reuse the original method.
*
* Note: It is not safe for this class to use static methods from SchemaKeyspace (static final fields are ok)
* as that triggers initialization of the class, which fails in client mode.
*/
private static Map<String, TableMetadataRef> fetchTables(String keyspace, Session session, IPartitioner partitioner, Types types)
{
Map<String, TableMetadataRef> tables = new HashMap<>();
String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
for (Row row : session.execute(query, keyspace))
{
String name = row.getString("table_name");
tables.put(name, createTableMetadata(keyspace, session, partitioner, false, row, name, types));
}
return tables;
}
private static Map<String, TableMetadataRef> fetchViews(String keyspace, Session session, IPartitioner partitioner, Types types)
{
Map<String, TableMetadataRef> tables = new HashMap<>();
String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
for (Row row : session.execute(query, keyspace))
{
String name = row.getString("view_name");
tables.put(name, createTableMetadata(keyspace, session, partitioner, true, row, name, types));
}
return tables;
}
private static TableMetadataRef createTableMetadata(String keyspace,
Session session,
IPartitioner partitioner,
boolean isView,
Row row,
String name,
Types types)
{
TableMetadata.Builder builder = TableMetadata.builder(keyspace, name, TableId.fromUUID(row.getUUID("id")))
.partitioner(partitioner);
if (!isView)
builder.flags(TableMetadata.Flag.fromStringSet(row.getSet("flags", String.class)));
String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspaceTables.COLUMNS);
for (Row colRow : session.execute(columnsQuery, keyspace, name))
builder.addColumn(createDefinitionFromRow(colRow, keyspace, name, types));
String droppedColumnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspaceTables.DROPPED_COLUMNS);
Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
for (Row colRow : session.execute(droppedColumnsQuery, keyspace, name))
{
DroppedColumn droppedColumn = createDroppedColumnFromRow(colRow, keyspace, name);
droppedColumns.put(droppedColumn.column.name.bytes, droppedColumn);
}
builder.droppedColumns(droppedColumns);
return TableMetadataRef.forOfflineTools(builder.build());
}
private static ColumnMetadata createDefinitionFromRow(Row row, String keyspace, String table, Types types)
{
ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), types);
if (order == ClusteringOrder.DESC)
type = ReversedType.getInstance(type);
ColumnIdentifier name = new ColumnIdentifier(row.getBytes("column_name_bytes"), row.getString("column_name"));
int position = row.getInt("position");
org.apache.cassandra.schema.ColumnMetadata.Kind kind = ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
return new ColumnMetadata(keyspace, table, name, type, position, kind);
}
private static DroppedColumn createDroppedColumnFromRow(Row row, String keyspace, String table)
{
String name = row.getString("column_name");
AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), Types.none());
ColumnMetadata.Kind kind = ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
ColumnMetadata column = new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, ColumnMetadata.NO_POSITION, kind);
long droppedTime = row.getTimestamp("dropped_time").getTime();
return new DroppedColumn(column, droppedTime);
}
}