| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gora.cassandra.store; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; |
| import me.prettyprint.cassandra.serializers.ByteBufferSerializer; |
| import me.prettyprint.cassandra.serializers.IntegerSerializer; |
| import me.prettyprint.cassandra.serializers.StringSerializer; |
| import me.prettyprint.cassandra.service.CassandraHostConfigurator; |
| import me.prettyprint.hector.api.Cluster; |
| import me.prettyprint.hector.api.Keyspace; |
| import me.prettyprint.hector.api.beans.OrderedRows; |
| import me.prettyprint.hector.api.beans.OrderedSuperRows; |
| import me.prettyprint.hector.api.beans.Row; |
| import me.prettyprint.hector.api.beans.SuperRow; |
| import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; |
| import me.prettyprint.hector.api.ddl.KeyspaceDefinition; |
| import me.prettyprint.hector.api.factory.HFactory; |
| import me.prettyprint.hector.api.mutation.Mutator; |
| import me.prettyprint.hector.api.query.QueryResult; |
| import me.prettyprint.hector.api.query.RangeSlicesQuery; |
| import me.prettyprint.hector.api.query.RangeSuperSlicesQuery; |
| import me.prettyprint.hector.api.HConsistencyLevel; |
| import me.prettyprint.hector.api.Serializer; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Type; |
| import org.apache.avro.generic.GenericArray; |
| import org.apache.avro.util.Utf8; |
| import org.apache.gora.cassandra.query.CassandraQuery; |
| import org.apache.gora.cassandra.serializers.GenericArraySerializer; |
| import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer; |
| import org.apache.gora.cassandra.serializers.TypeUtils; |
| import org.apache.gora.mapreduce.GoraRecordReader; |
| import org.apache.gora.persistency.Persistent; |
| import org.apache.gora.persistency.State; |
| import org.apache.gora.persistency.StatefulHashMap; |
| import org.apache.gora.query.Query; |
| import org.apache.gora.util.ByteUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class CassandraClient<K, T extends Persistent> { |
| public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); |
| |
| private Cluster cluster; |
| private Keyspace keyspace; |
| private Mutator<K> mutator; |
| private Class<K> keyClass; |
| private Class<T> persistentClass; |
| |
| private CassandraMapping cassandraMapping = null; |
| |
| private Serializer<K> keySerializer; |
| |
| public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception { |
| this.keyClass = keyClass; |
| |
| // get cassandra mapping with persistent class |
| this.persistentClass = persistentClass; |
| this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass); |
| |
| this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName())); |
| |
| // add keyspace to cluster |
| checkKeyspace(); |
| |
| // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families. |
| this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster); |
| |
| this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass); |
| this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer); |
| } |
| |
| /** |
| * Check if keyspace already exists. |
| */ |
| public boolean keyspaceExists() { |
| KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName()); |
| return (keyspaceDefinition != null); |
| } |
| |
| /** |
| * Check if keyspace already exists. If not, create it. |
| * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel} |
| * logic. It is set by passing a ConfigurableConsistencyLevel object right |
| * when the Keyspace is created. Currently consistency level is .ONE which |
| * permits consistency to wait until one replica has responded. |
| */ |
| public void checkKeyspace() { |
| // "describe keyspace <keyspaceName>;" query |
| KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName()); |
| if (keyspaceDefinition == null) { |
| List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions(); |
| keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions); |
| this.cluster.addKeyspace(keyspaceDefinition, true); |
| // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'"); |
| |
| // Create a customized Consistency Level |
| ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel(); |
| Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>(); |
| |
| // Define CL.ONE for ColumnFamily "ColumnFamily" |
| clmap.put("ColumnFamily", HConsistencyLevel.ONE); |
| |
| // In this we use CL.ONE for read and writes. But you can use different CLs if needed. |
| configurableConsistencyLevel.setReadCfConsistencyLevels(clmap); |
| configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap); |
| |
| // Then let the keyspace know |
| HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel); |
| |
| keyspaceDefinition = null; |
| } |
| |
| } |
| |
| /** |
| * Drop keyspace. |
| */ |
| public void dropKeyspace() { |
| // "drop keyspace <keyspaceName>;" query |
| this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName()); |
| } |
| |
| /** |
| * Insert a field in a column. |
| * @param key the row key |
| * @param fieldName the field name |
| * @param value the field value. |
| */ |
| public void addColumn(K key, String fieldName, Object value) { |
| if (value == null) { |
| return; |
| } |
| |
| ByteBuffer byteBuffer = toByteBuffer(value); |
| |
| String columnFamily = this.cassandraMapping.getFamily(fieldName); |
| String columnName = this.cassandraMapping.getColumn(fieldName); |
| if (columnName == null) { |
| LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString()); |
| return; |
| } |
| |
| HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer); |
| } |
| |
| /** |
| * Insert a member in a super column. This is used for map and record Avro types. |
| * @param key the row key |
| * @param fieldName the field name |
| * @param columnName the column name (the member name, or the index of array) |
| * @param value the member value |
| */ |
| @SuppressWarnings("unchecked") |
| public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) { |
| if (value == null) { |
| return; |
| } |
| |
| ByteBuffer byteBuffer = toByteBuffer(value); |
| |
| String columnFamily = this.cassandraMapping.getFamily(fieldName); |
| String superColumnName = this.cassandraMapping.getColumn(fieldName); |
| |
| HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer); |
| } |
| |
| public void addSubColumn(K key, String fieldName, String columnName, Object value) { |
| addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value); |
| } |
| |
| public void addSubColumn(K key, String fieldName, Integer columnName, Object value) { |
| addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value); |
| } |
| |
| |
| /** |
| * Delete a member in a super column. This is used for map and record Avro types. |
| * @param key the row key |
| * @param fieldName the field name |
| * @param columnName the column name (the member name, or the index of array) |
| */ |
| @SuppressWarnings("unchecked") |
| public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) { |
| |
| String columnFamily = this.cassandraMapping.getFamily(fieldName); |
| String superColumnName = this.cassandraMapping.getColumn(fieldName); |
| |
| HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName); |
| } |
| |
| public void deleteSubColumn(K key, String fieldName, String columnName) { |
| deleteSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName)); |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| public void addGenericArray(K key, String fieldName, GenericArray array) { |
| if (isSuper( cassandraMapping.getFamily(fieldName) )) { |
| int i= 0; |
| for (Object itemValue: array) { |
| |
| // TODO: hack, do not store empty arrays |
| if (itemValue instanceof GenericArray<?>) { |
| if (((GenericArray)itemValue).size() == 0) { |
| continue; |
| } |
| } else if (itemValue instanceof StatefulHashMap<?,?>) { |
| if (((StatefulHashMap)itemValue).size() == 0) { |
| continue; |
| } |
| } |
| |
| addSubColumn(key, fieldName, i++, itemValue); |
| } |
| } |
| else { |
| addColumn(key, fieldName, array); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) { |
| if (isSuper( cassandraMapping.getFamily(fieldName) )) { |
| int i= 0; |
| for (Utf8 mapKey: map.keySet()) { |
| if (map.getState(mapKey) == State.DELETED) { |
| deleteSubColumn(key, fieldName, mapKey.toString()); |
| continue; |
| } |
| |
| // TODO: hack, do not store empty arrays |
| Object mapValue = map.get(mapKey); |
| if (mapValue instanceof GenericArray<?>) { |
| if (((GenericArray)mapValue).size() == 0) { |
| continue; |
| } |
| } else if (mapValue instanceof StatefulHashMap<?,?>) { |
| if (((StatefulHashMap)mapValue).size() == 0) { |
| continue; |
| } |
| } |
| |
| addSubColumn(key, fieldName, mapKey.toString(), mapValue); |
| } |
| } |
| else { |
| addColumn(key, fieldName, map); |
| } |
| } |
| |
| /** |
| * Serialize value to ByteBuffer. |
| * @param value the member value |
| * @return ByteBuffer object |
| */ |
| @SuppressWarnings("unchecked") |
| public ByteBuffer toByteBuffer(Object value) { |
| ByteBuffer byteBuffer = null; |
| Serializer serializer = GoraSerializerTypeInferer.getSerializer(value); |
| if (serializer == null) { |
| LOG.info("Serializer not found for: " + value.toString()); |
| } |
| else { |
| byteBuffer = serializer.toByteBuffer(value); |
| } |
| |
| if (byteBuffer == null) { |
| LOG.info("value class=" + value.getClass().getName() + " value=" + value + " -> null"); |
| } |
| |
| return byteBuffer; |
| } |
| |
| /** |
| * Select a family column in the keyspace. |
| * @param cassandraQuery a wrapper of the query |
| * @param family the family name to be queried |
| * @return a list of family rows |
| */ |
| public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) { |
| |
| String[] columnNames = cassandraQuery.getColumns(family); |
| ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length]; |
| for (int i = 0; i < columnNames.length; i++) { |
| columnNameByteBuffers[i] = StringSerializer.get().toByteBuffer(columnNames[i]); |
| } |
| Query<K, T> query = cassandraQuery.getQuery(); |
| int limit = (int) query.getLimit(); |
| if (limit < 1) { |
| limit = Integer.MAX_VALUE; |
| } |
| K startKey = query.getStartKey(); |
| K endKey = query.getEndKey(); |
| |
| RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get()); |
| rangeSlicesQuery.setColumnFamily(family); |
| rangeSlicesQuery.setKeys(startKey, endKey); |
| rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); |
| rangeSlicesQuery.setRowCount(limit); |
| rangeSlicesQuery.setColumnNames(columnNameByteBuffers); |
| |
| QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute(); |
| OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get(); |
| |
| |
| return orderedRows.getList(); |
| } |
| |
| /** |
| * Select the families that contain at least one column mapped to a query field. |
| * @param query indicates the columns to select |
| * @return a map which keys are the family names and values the corresponding column names required to get all the query fields. |
| */ |
| public Map<String, List<String>> getFamilyMap(Query<K, T> query) { |
| Map<String, List<String>> map = new HashMap<String, List<String>>(); |
| for (String field: query.getFields()) { |
| String family = this.cassandraMapping.getFamily(field); |
| String column = this.cassandraMapping.getColumn(field); |
| |
| // check if the family value was already initialized |
| List<String> list = map.get(family); |
| if (list == null) { |
| list = new ArrayList<String>(); |
| map.put(family, list); |
| } |
| |
| if (column != null) { |
| list.add(column); |
| } |
| |
| } |
| |
| return map; |
| } |
| |
| /** |
| * Select the field names according to the column names, which format if fully qualified: "family:column" |
| * @param query |
| * @return a map which keys are the fully qualified column names and values the query fields |
| */ |
| public Map<String, String> getReverseMap(Query<K, T> query) { |
| Map<String, String> map = new HashMap<String, String>(); |
| for (String field: query.getFields()) { |
| String family = this.cassandraMapping.getFamily(field); |
| String column = this.cassandraMapping.getColumn(field); |
| |
| map.put(family + ":" + column, field); |
| } |
| |
| return map; |
| |
| } |
| |
| public boolean isSuper(String family) { |
| return this.cassandraMapping.isSuper(family); |
| } |
| |
| public List<SuperRow<K, String, ByteBuffer, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) { |
| String[] columnNames = cassandraQuery.getColumns(family); |
| Query<K, T> query = cassandraQuery.getQuery(); |
| int limit = (int) query.getLimit(); |
| if (limit < 1) { |
| limit = Integer.MAX_VALUE; |
| } |
| K startKey = query.getStartKey(); |
| K endKey = query.getEndKey(); |
| |
| RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); |
| rangeSuperSlicesQuery.setColumnFamily(family); |
| rangeSuperSlicesQuery.setKeys(startKey, endKey); |
| rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); |
| rangeSuperSlicesQuery.setRowCount(limit); |
| rangeSuperSlicesQuery.setColumnNames(columnNames); |
| |
| |
| QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute(); |
| OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get(); |
| return orderedRows.getList(); |
| |
| |
| } |
| |
| /** |
| * Obtain Schema/Keyspace name |
| * @return Keyspace |
| */ |
| public String getKeyspaceName() { |
| return this.cassandraMapping.getKeyspaceName(); |
| } |
| } |