blob: c7b736fe7b9d7d4d79bf7e5ea88b4bf22e67c96c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.core.datastax.impl;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class DataStaxClusterImpl implements DataStaxCluster {
private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class );
private final CassandraFig cassandraFig;
private Cluster cluster;
private Session applicationSession;
private Session clusterSession;
@Inject
public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception {
this.cassandraFig = cassandraFig;
this.cluster = buildCluster();
// always initialize the keyspaces
this.createOrUpdateKeyspace();
logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s, Pool Timeout={}s",
cluster.getMetadata().getAllHosts().toString(),
cluster.getConfiguration().getPoolingOptions().getIdleTimeoutSeconds(),
cluster.getConfiguration().getPoolingOptions().getPoolTimeoutMillis() / 1000);
}
@Override
public Cluster getCluster(){
// ensure we can build the cluster if it was previously closed
if ( cluster.isClosed() ){
cluster = buildCluster();
}
return cluster;
}
@Override
public Session getClusterSession(){
// always grab cluster from getCluster() in case it was prematurely closed
if ( clusterSession == null || clusterSession.isClosed() ){
clusterSession = getCluster().connect();
}
return clusterSession;
}
@Override
public Session getApplicationSession(){
// always grab cluster from getCluster() in case it was prematurely closed
if ( applicationSession == null || applicationSession.isClosed() ){
applicationSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) );
}
return applicationSession;
}
/**
* Execute CQL that will create the keyspace if it doesn't exist and alter it if it does.
* @throws Exception
*/
@Override
public void createOrUpdateKeyspace() throws Exception {
clusterSession = getClusterSession();
final String createApplicationKeyspace = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() )
);
final String updateApplicationKeyspace = String.format(
"ALTER KEYSPACE %s WITH replication = %s",
CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() )
);
clusterSession.execute(createApplicationKeyspace);
clusterSession.executeAsync(updateApplicationKeyspace);
logger.info("Created/Updated keyspace: {}", cassandraFig.getApplicationKeyspace());
waitForSchemaAgreement();
}
/**
* Wait until all Cassandra nodes agree on the schema. Sleeps 100ms between checks.
*
*/
public void waitForSchemaAgreement() {
while ( true ) {
if( this.cluster.getMetadata().checkSchemaAgreement() ){
return;
}
//sleep and try it again
try {
Thread.sleep( 100 );
}
catch ( InterruptedException e ) {
//swallow
}
}
}
public Cluster buildCluster(){
ConsistencyLevel defaultConsistencyLevel;
try {
defaultConsistencyLevel = ConsistencyLevel.valueOf(cassandraFig.getReadCl());
} catch (IllegalArgumentException e){
logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}",
CassandraFig.READ_CL,
ConsistencyLevel.LOCAL_QUORUM);
defaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}
LoadBalancingPolicy loadBalancingPolicy;
if( !cassandraFig.getLocalDataCenter().isEmpty() ){
loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder()
.withLocalDc( cassandraFig.getLocalDataCenter() ).build();
}else{
loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build();
}
final PoolingOptions poolingOptions = new PoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections() / 2)
.setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections())
.setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000)
.setPoolTimeoutMillis(cassandraFig.getPoolTimeout());
final QueryOptions queryOptions = new QueryOptions()
.setConsistencyLevel(defaultConsistencyLevel);
Cluster.Builder datastaxCluster = Cluster.builder()
.withClusterName(cassandraFig.getClusterName())
.addContactPoints(cassandraFig.getHosts().split(","))
.withCompression(ProtocolOptions.Compression.LZ4)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withPoolingOptions(poolingOptions)
.withQueryOptions(queryOptions)
.withProtocolVersion(getProtocolVersion(cassandraFig.getVersion()));
// only add auth credentials if they were provided
if ( !cassandraFig.getUsername().isEmpty() && !cassandraFig.getPassword().isEmpty() ){
datastaxCluster.withCredentials(
cassandraFig.getUsername(),
cassandraFig.getPassword()
);
}
return datastaxCluster.build();
}
private ProtocolVersion getProtocolVersion(String versionNumber){
ProtocolVersion protocolVersion;
switch (versionNumber) {
case "2.1":
protocolVersion = ProtocolVersion.V3;
break;
case "2.0":
protocolVersion = ProtocolVersion.V2;
break;
case "1.2":
protocolVersion = ProtocolVersion.V1;
break;
default:
protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
break;
}
return protocolVersion;
}
}