blob: 88c4b67f79106ea73c1c7d6c2f402cdeb28e4e23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.metamodel.cassandra;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.SimpleDataSetHeader;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableSchema;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;
/**
* DataContext implementation for Apache Cassandra database.
*
* When instantiating this DataContext, a keyspace name is provided. In
* Cassandra, the keyspace is the container for your application data, similar
* to a schema in a relational database. Keyspaces are used to group column
* families together.
*
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*
*/
public class CassandraDataContext extends QueryPostprocessDataContext implements DataContext {
private static final Logger logger = LoggerFactory.getLogger(CassandraDataContext.class);
private final Cluster cassandraCluster;
private final SimpleTableDef[] tableDefs;
private final String keySpaceName;
/**
* Constructs a {@link CassandraDataContext}. This constructor accepts a
* custom array of {@link SimpleTableDef}s which allows the user to define
* his own view on the indexes in the engine.
*
* @param cluster
* the Cassandra cluster
* @param keySpace
* the name of the Cassandra keyspace
* @param tableDefs
* an array of {@link SimpleTableDef}s, which define the table
* and column model of the ElasticSearch index.
*/
public CassandraDataContext(Cluster cluster, String keySpace, SimpleTableDef... tableDefs) {
super(false);
this.cassandraCluster = cluster;
this.keySpaceName = keySpace;
this.tableDefs = tableDefs;
}
/**
* Constructs a {@link CassandraDataContext} and automatically detects the
* schema structure/view on the keyspace (see
* {@link #detectSchema(Cluster, String)}).
*
* @param cluster
* the Cassandra cluster
* @param keySpace
* the name of the Cassandra keyspace to represent
*/
public CassandraDataContext(Cluster cluster, String keySpace) {
this(cluster, keySpace, detectSchema(cluster, keySpace));
}
/**
* Performs an analysis of the given keyspace in a Cassandra cluster
* {@link Cluster} instance and detects the cassandra types structure based
* on the metadata provided by the datastax cassandra java client.
*
* @see #detectTable(TableMetadata)
*
* @param cluster
* the cluster to inspect
* @param keyspaceName
* @return a mutable schema instance, useful for further fine tuning by the
* user.
*/
public static SimpleTableDef[] detectSchema(Cluster cluster, String keyspaceName) {
final Metadata metadata = cluster.getMetadata();
final KeyspaceMetadata keyspace = metadata.getKeyspace(keyspaceName);
if (keyspace == null) {
throw new IllegalArgumentException("Keyspace '" + keyspaceName + "' does not exist in the database");
}
final Collection<TableMetadata> tables = keyspace.getTables();
final SimpleTableDef[] result = new SimpleTableDef[tables.size()];
int i = 0;
for (final TableMetadata tableMetaData : tables) {
final SimpleTableDef table = detectTable(tableMetaData);
result[i] = table;
i++;
}
return result;
}
/**
* Performs an analysis of an available table in Cassandra.
*
* @param tableMetaData
* the table meta data
* @return a table definition for cassandra.
*/
public static SimpleTableDef detectTable(TableMetadata tableMetaData) {
final List<ColumnMetadata> columns = tableMetaData.getColumns();
final String[] columnNames = new String[columns.size()];
final ColumnType[] columnTypes = new ColumnType[columns.size()];
int i = 0;
for (final ColumnMetadata column : columns) {
columnNames[i] = column.getName();
columnTypes[i] = getColumnTypeFromMetaDataField(column.getType().getName());
i++;
}
return new SimpleTableDef(tableMetaData.getName(), columnNames, columnTypes);
}
@Override
protected Schema getMainSchema() throws MetaModelException {
final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
for (final SimpleTableDef tableDef : tableDefs) {
final MutableTable table = tableDef.toTable().setSchema(theSchema);
final TableMetadata cassandraTable = cassandraCluster.getMetadata().getKeyspace(keySpaceName).getTable(table
.getName());
if (cassandraTable != null) {
final List<ColumnMetadata> primaryKeys = cassandraTable.getPrimaryKey();
for (ColumnMetadata primaryKey : primaryKeys) {
final MutableColumn column = (MutableColumn) table.getColumnByName(primaryKey.getName());
if (column != null) {
column.setPrimaryKey(true);
}
column.setNativeType(primaryKey.getType().getName().name());
}
}
theSchema.addTable(table);
}
return theSchema;
}
@Override
protected String getMainSchemaName() throws MetaModelException {
return keySpaceName;
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
final Select query = QueryBuilder.select().all().from(keySpaceName, table.getName());
if (limitMaxRowsIsSet(maxRows)) {
query.limit(maxRows);
}
final ResultSet resultSet = cassandraCluster.connect().execute(query);
final Iterator<Row> response = resultSet.iterator();
return new CassandraDataSet(response, columns);
}
private boolean limitMaxRowsIsSet(int maxRows) {
return (maxRows != -1);
}
@Override
protected org.apache.metamodel.data.Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems,
Column primaryKeyColumn, Object keyValue) {
if (primaryKeyColumn.getType() == ColumnType.UUID && keyValue instanceof String) {
keyValue = UUID.fromString(keyValue.toString());
}
Selection select = QueryBuilder.select();
for (SelectItem selectItem : selectItems) {
final Column column = selectItem.getColumn();
assert column != null;
select = select.column(column.getName());
}
final Statement statement = select.from(keySpaceName, table.getName()).where(QueryBuilder.eq(primaryKeyColumn
.getName(), keyValue));
final Row row = cassandraCluster.connect().execute(statement).one();
return CassandraUtils.toRow(row, new SimpleDataSetHeader(selectItems));
}
@Override
protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
if (!whereItems.isEmpty()) {
// not supported - will have to be done by counting client-side
logger.debug(
"Not able to execute count query natively - resorting to query post-processing, which may be expensive");
return null;
}
final Statement statement = QueryBuilder.select().countAll().from(keySpaceName, table.getName());
final Row response = cassandraCluster.connect().execute(statement).one();
return response.getLong(0);
}
private static ColumnType getColumnTypeFromMetaDataField(DataType.Name metaDataName) {
switch (metaDataName) {
case BIGINT:
case COUNTER:
return ColumnType.BIGINT;
case BLOB:
return ColumnType.BLOB;
case BOOLEAN:
return ColumnType.BOOLEAN;
case DECIMAL:
return ColumnType.DECIMAL;
case DOUBLE:
return ColumnType.DOUBLE;
case FLOAT:
return ColumnType.FLOAT;
case INT:
return ColumnType.INTEGER;
case TEXT:
return ColumnType.STRING;
case TIMESTAMP:
return ColumnType.TIMESTAMP;
case UUID:
return ColumnType.UUID;
case VARCHAR:
return ColumnType.VARCHAR;
case VARINT:
return ColumnType.BIGINT;
case LIST:
return ColumnType.LIST;
case MAP:
return ColumnType.MAP;
case CUSTOM:
return ColumnType.OTHER;
case INET:
return ColumnType.INET;
case SET:
return ColumnType.SET;
default:
return ColumnType.STRING;
}
}
}