blob: 559294a99068f56016ffa7ee6c43cbe67b501963 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tests.utils;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.atomic.AtomicInteger;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Helper class providing bunch of utility methods to work with Cassandra
*/
public class CassandraHelper {
/** */
private static final ResourceBundle CREDENTIALS = ResourceBundle.getBundle("org/apache/ignite/tests/cassandra/credentials");
/** */
private static final ResourceBundle CONNECTION = ResourceBundle.getBundle("org/apache/ignite/tests/cassandra/connection");
/** */
private static final ResourceBundle KEYSPACES = ResourceBundle.getBundle("org/apache/ignite/tests/cassandra/keyspaces");
/** */
private static final String EMBEDDED_CASSANDRA_YAML = "org/apache/ignite/tests/cassandra/embedded-cassandra.yaml";
/** */
private static final ApplicationContext connectionContext =
new ClassPathXmlApplicationContext("org/apache/ignite/tests/cassandra/connection-settings.xml");
/** */
private static DataSource adminDataSrc;
/** */
private static DataSource regularDataSrc;
/** */
private static Cluster adminCluster;
/** */
private static Cluster regularCluster;
/** */
private static Session adminSes;
/** */
private static Session regularSes;
/** */
private static CassandraLifeCycleBean embeddedCassandraBean;
/** */
public static String getAdminUser() {
return CREDENTIALS.getString("admin.user");
}
/** */
public static String getAdminPassword() {
return CREDENTIALS.getString("admin.password");
}
/** */
public static String getRegularUser() {
return CREDENTIALS.getString("regular.user");
}
/** */
public static String getRegularPassword() {
return CREDENTIALS.getString("regular.password");
}
/** */
public static String[] getTestKeyspaces() {
return KEYSPACES.getString("keyspaces").split(",");
}
/** */
private static AtomicInteger refCounter = new AtomicInteger(0);
/** */
public static String[] getContactPointsArray() {
String[] points = CONNECTION.getString("contact.points").split(",");
if (points.length == 0)
throw new RuntimeException("No Cassandra contact points specified");
for (int i = 0; i < points.length; i++)
points[i] = points[i].trim();
return points;
}
/** */
public static List<InetAddress> getContactPoints() {
String[] points = getContactPointsArray();
List<InetAddress> contactPoints = new LinkedList<>();
for (String point : points) {
if (point.contains(":"))
continue;
try {
contactPoints.add(InetAddress.getByName(point));
}
catch (Throwable e) {
throw new IllegalArgumentException("Incorrect contact point '" + point +
"' specified for Cassandra cache storage", e);
}
}
return contactPoints;
}
/** */
public static List<InetSocketAddress> getContactPointsWithPorts() {
String[] points = getContactPointsArray();
List<InetSocketAddress> contactPoints = new LinkedList<>();
for (String point : points) {
if (!point.contains(":"))
continue;
String[] chunks = point.split(":");
try {
contactPoints.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
}
catch (Throwable e) {
throw new IllegalArgumentException("Incorrect contact point '" + point +
"' specified for Cassandra cache storage", e);
}
}
return contactPoints;
}
/**
* Checks if embedded Cassandra should be used for unit tests
* @return true if embedded Cassandra should be used
*/
public static boolean useEmbeddedCassandra() {
String[] contactPoints = getContactPointsArray();
return contactPoints != null && contactPoints.length == 1 && contactPoints[0].trim().startsWith("127.0.0.1");
}
/** */
public static void dropTestKeyspaces() {
String[] keyspaces = getTestKeyspaces();
for (String keyspace : keyspaces) {
try {
executeWithAdminCredentials("DROP KEYSPACE IF EXISTS " + keyspace + ";");
}
catch (Throwable e) {
throw new RuntimeException("Failed to drop keyspace: " + keyspace, e);
}
}
}
/** */
public static ResultSet executeWithAdminCredentials(String statement, Object... args) {
if (args == null || args.length == 0)
return adminSession().execute(statement);
PreparedStatement ps = adminSession().prepare(statement);
return adminSession().execute(ps.bind(args));
}
/** */
public static ResultSet executeWithRegularCredentials(String statement, Object... args) {
if (args == null || args.length == 0)
return regularSession().execute(statement);
PreparedStatement ps = regularSession().prepare(statement);
return regularSession().execute(ps.bind(args));
}
/** */
public static ResultSet executeWithAdminCredentials(Statement statement) {
return adminSession().execute(statement);
}
/** */
public static ResultSet executeWithRegularCredentials(Statement statement) {
return regularSession().execute(statement);
}
/** */
public static synchronized DataSource getAdminDataSrc() {
if (adminDataSrc != null)
return adminDataSrc;
return adminDataSrc = (DataSource)connectionContext.getBean("cassandraAdminDataSource");
}
/** */
public static synchronized DataSource getRegularDataSrc() {
if (regularDataSrc != null)
return regularDataSrc;
return regularDataSrc = (DataSource)connectionContext.getBean("cassandraRegularDataSource");
}
/** */
public static void testAdminConnection() {
try {
adminSession();
}
catch (Throwable e) {
throw new RuntimeException("Failed to check admin connection to Cassandra", e);
}
}
/** */
public static void testRegularConnection() {
try {
regularSession();
}
catch (Throwable e) {
throw new RuntimeException("Failed to check regular connection to Cassandra", e);
}
}
/** */
public static synchronized void releaseCassandraResources() {
try {
if (adminSes != null && !adminSes.isClosed())
U.closeQuiet(adminSes);
}
finally {
adminSes = null;
}
try {
if (adminCluster != null && !adminCluster.isClosed())
U.closeQuiet(adminCluster);
}
finally {
adminCluster = null;
}
try {
if (regularSes != null && !regularSes.isClosed())
U.closeQuiet(regularSes);
}
finally {
regularSes = null;
}
try {
if (regularCluster != null && !regularCluster.isClosed())
U.closeQuiet(regularCluster);
}
finally {
regularCluster = null;
}
SessionPool.release();
}
/** */
private static synchronized Session adminSession() {
if (adminSes != null)
return adminSes;
try {
Cluster.Builder builder = Cluster.builder();
builder = builder.withCredentials(getAdminUser(), getAdminPassword());
builder.addContactPoints(getContactPoints());
builder.addContactPointsWithPorts(getContactPointsWithPorts());
adminCluster = builder.build();
return adminSes = adminCluster.connect();
}
catch (Throwable e) {
throw new RuntimeException("Failed to create admin session to Cassandra database", e);
}
}
/** */
private static synchronized Session regularSession() {
if (regularSes != null)
return regularSes;
try {
Cluster.Builder builder = Cluster.builder();
builder = builder.withCredentials(getRegularUser(), getRegularPassword());
builder.addContactPoints(getContactPoints());
builder.addContactPointsWithPorts(getContactPointsWithPorts());
regularCluster = builder.build();
return regularSes = regularCluster.connect();
}
catch (Throwable e) {
throw new RuntimeException("Failed to create regular session to Cassandra database", e);
}
}
/**
* Note that setting of cassandra.storagedir property is expected.
*/
public static void startEmbeddedCassandra(Logger log) {
if (refCounter.getAndIncrement() > 0)
return;
ClassLoader clsLdr = CassandraHelper.class.getClassLoader();
URL url = clsLdr.getResource(EMBEDDED_CASSANDRA_YAML);
embeddedCassandraBean = new CassandraLifeCycleBean();
embeddedCassandraBean.setCassandraConfigFile(url.getFile());
try {
Field logField = CassandraLifeCycleBean.class.getDeclaredField("log");
logField.setAccessible(true);
logField.set(embeddedCassandraBean, new GridTestLog4jLogger(log));
}
catch (Throwable e) {
throw new RuntimeException("Failed to initialize logger for CassandraLifeCycleBean", e);
}
embeddedCassandraBean.onLifecycleEvent(LifecycleEventType.BEFORE_NODE_START);
}
/** */
public static void stopEmbeddedCassandra() {
if (refCounter.decrementAndGet() > 0)
return;
if (embeddedCassandraBean != null)
embeddedCassandraBean.onLifecycleEvent(LifecycleEventType.BEFORE_NODE_STOP);
}
}