| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package io.mifos.core.cassandra.core; |
| |
| import com.datastax.driver.core.AuthProvider; |
| import com.datastax.driver.core.Cluster; |
| import com.datastax.driver.core.PlainTextAuthProvider; |
| import com.datastax.driver.core.Session; |
| import com.datastax.driver.core.exceptions.InvalidQueryException; |
| import com.datastax.driver.mapping.Mapper; |
| import com.datastax.driver.mapping.MappingManager; |
| import io.mifos.core.cassandra.domain.Tenant; |
| import io.mifos.core.cassandra.util.CassandraConnectorConstants; |
| import io.mifos.core.cassandra.util.CodecRegistry; |
| import io.mifos.core.cassandra.util.ContactPointUtils; |
| import io.mifos.core.cassandra.util.LocalDateTimeCodec; |
| import io.mifos.core.lang.ServiceException; |
| import io.mifos.core.lang.TenantContextHolder; |
| import org.slf4j.Logger; |
| import org.springframework.core.env.Environment; |
| import org.springframework.util.Assert; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.PreDestroy; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.locks.StampedLock; |
| |
| @SuppressWarnings("WeakerAccess") |
| public class CassandraSessionProvider { |
| |
| private final Environment env; |
| private final Logger logger; |
| private final ConcurrentHashMap<String, Cluster> clusterCache; |
| private final ConcurrentHashMap<String, Session> sessionCache; |
| private final StampedLock mapperLock = new StampedLock(); |
| private String adminClusterName; |
| private String adminContactPoints; |
| private String adminKeyspace; |
| private MappingManager adminSessionMappingManager; |
| |
| public CassandraSessionProvider(@Nonnull final Environment env, @Nonnull final Logger logger) { |
| super(); |
| Assert.notNull(env, "An environment must be given."); |
| Assert.notNull(logger, "A logger must be given."); |
| this.env = env; |
| this.logger = logger; |
| this.clusterCache = new ConcurrentHashMap<>(); |
| this.sessionCache = new ConcurrentHashMap<>(); |
| } |
| |
| public void setAdminClusterName(@Nonnull final String adminClusterName) { |
| Assert.notNull(adminClusterName, "A cluster name must be given."); |
| Assert.hasText(adminClusterName, "A cluster name must be given."); |
| this.adminClusterName = adminClusterName; |
| } |
| |
| public void setAdminContactPoints(@Nonnull final String adminContactPoints) { |
| Assert.notNull(adminContactPoints, "At least one contact point must be given."); |
| Assert.hasText(adminContactPoints, "At least one contact point must be given."); |
| this.adminContactPoints = adminContactPoints; |
| } |
| |
| public void setAdminKeyspace(@Nonnull final String adminKeyspace) { |
| Assert.notNull(adminKeyspace, "An keyspace must be given."); |
| Assert.hasText(adminKeyspace, "An keyspace must be given."); |
| this.adminKeyspace = adminKeyspace; |
| } |
| |
| @Nonnull |
| public Session getAdminSession() { |
| if (this.adminClusterName == null |
| || this.adminContactPoints == null |
| || this.adminKeyspace == null) { |
| throw new IllegalStateException("Cluster name, contact points, and keyspace must be set to retrieve an admin session."); |
| } |
| |
| try { |
| return this.getSession(this.adminClusterName, this.adminContactPoints, this.adminKeyspace); |
| } catch (final KeyspaceDoesntExistYet ignored) { |
| final Cluster cluster = this.clusterCache.get(adminClusterName); |
| try (final Session session = cluster.newSession()) { |
| session.execute("CREATE KEYSPACE " + this.adminKeyspace + " WITH REPLICATION = " + |
| ReplicationStrategyResolver.replicationStrategy( |
| env.getProperty(CassandraConnectorConstants.DEFAULT_REPLICATION_TYPE, |
| CassandraConnectorConstants.DEFAULT_REPLICATION_TYPE_DEFAULT), |
| env.getProperty(CassandraConnectorConstants.DEFAULT_REPLICATION_REPLICAS, |
| CassandraConnectorConstants.DEFAULT_REPLICATION_REPLICAS_DEFAULT))); |
| |
| return this.getSession(this.adminClusterName, this.adminContactPoints, this.adminKeyspace); |
| } |
| } |
| } |
| |
| @Nonnull |
| public Session getTenantSession() { |
| return TenantContextHolder.identifier() |
| .map(this::getTenantSession) |
| .orElseThrow(() -> |
| new IllegalArgumentException("Could not find tenant identifier, make sure you set an identifier using TenantContextHolder.")); |
| } |
| |
| @Nonnull |
| public Session getTenantSession(@Nonnull final String identifier) { |
| Assert.notNull(identifier, "A tenant identifier must be given."); |
| Assert.hasText(identifier, "A tenant identifier must be given."); |
| |
| final Mapper<Tenant> tenantInfoMapper = this.getAdminSessionMappingManager().mapper(Tenant.class); |
| tenantInfoMapper.setDefaultDeleteOptions(OptionProvider.deleteConsistencyLevel(this.env)); |
| tenantInfoMapper.setDefaultGetOptions(OptionProvider.readConsistencyLevel(this.env)); |
| tenantInfoMapper.setDefaultSaveOptions(OptionProvider.writeConsistencyLevel(this.env)); |
| final Tenant tenantInfo = tenantInfoMapper.get(identifier); |
| if (tenantInfo == null) throw ServiceException.notFound("Tenant [" + identifier + "] unknown."); |
| return this.getSession(tenantInfo.getClusterName(), tenantInfo.getContactPoints(), tenantInfo.getKeyspace()); |
| } |
| |
| @Nonnull |
| public Session getSession(@Nonnull final String clusterName, |
| @Nonnull final String contactPoints, |
| @Nonnull final String keyspace) { |
| Assert.notNull(clusterName, "A cluster name must be given."); |
| Assert.hasText(clusterName, "A cluster name must be given."); |
| Assert.notNull(contactPoints, "At least one contact point must be given."); |
| Assert.hasText(contactPoints, "At least one contact point must be given."); |
| Assert.notNull(keyspace, "A keyspace must be given."); |
| Assert.hasText(keyspace, "A keyspace must be given."); |
| |
| this.sessionCache.computeIfAbsent(keyspace, (sessionKey) -> { |
| this.logger.info("Create new session for keyspace [" + keyspace + "]."); |
| |
| final Cluster cluster = this.clusterCache.computeIfAbsent(clusterName, |
| (clusterKey) -> getCluster(clusterKey, contactPoints)); |
| try { |
| CodecRegistry.apply(cluster); |
| return cluster.connect(keyspace); |
| } catch (final InvalidQueryException ex) { |
| throw new KeyspaceDoesntExistYet("Could not connect keyspace!", ex); |
| } |
| }); |
| |
| return this.sessionCache.get(keyspace); |
| } |
| |
| private Cluster getCluster(@Nonnull final String clusterName, @Nonnull final String contactPoints) { |
| CodecRegistry.register(new LocalDateTimeCodec()); |
| |
| final Cluster.Builder clusterBuilder = Cluster.builder().withClusterName(clusterName); |
| |
| if (this.env.containsProperty(CassandraConnectorConstants.CLUSTER_USER_PROP)) { |
| final String user = this.env.getProperty(CassandraConnectorConstants.CLUSTER_USER_PROP); |
| final String pwd = this.env.getProperty(CassandraConnectorConstants.CLUSTER_PASSWORD_PROP); |
| |
| final AuthProvider authProvider = new PlainTextAuthProvider(user, pwd); |
| clusterBuilder.withAuthProvider(authProvider); |
| } |
| |
| ContactPointUtils.process(clusterBuilder, contactPoints); |
| return clusterBuilder.build(); |
| } |
| |
| @Nonnull |
| public MappingManager getAdminSessionMappingManager() { |
| if (this.adminSessionMappingManager == null) { |
| final long lockStamp = this.mapperLock.writeLock(); |
| try { |
| if (this.adminSessionMappingManager == null) { |
| this.adminSessionMappingManager = new MappingManager(this.getAdminSession()); |
| } |
| } finally { |
| this.mapperLock.unlockWrite(lockStamp); |
| } |
| } |
| |
| return this.adminSessionMappingManager; |
| } |
| |
| public void touchAdminSession() { |
| this.getAdminSession(); |
| } |
| |
| @PreDestroy |
| private void cleanUp() { |
| this.logger.info("Clean up cluster connections."); |
| |
| this.sessionCache.values().forEach(Session::close); |
| this.sessionCache.clear(); |
| |
| this.clusterCache.values().forEach(Cluster::close); |
| this.clusterCache.clear(); |
| } |
| } |